From 1a53bdbfb35c04852e8957d692864bfea03bf3f6 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Tue, 20 Feb 2024 18:01:00 +0200 Subject: [PATCH] Make VertxGrpcExporter more robust Introduce retry when a network related error occurs Relates to: #35686 (cherry picked from commit 1fe7badeb98109b0c5d457a77ddc7e313fa02625) --- .../exporter/otlp/VertxGrpcExporter.java | 137 +++++++++++++----- 1 file changed, 99 insertions(+), 38 deletions(-) diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java index 5e034cdbd2416..7d029b52fe9e5 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java @@ -21,6 +21,8 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import io.quarkus.vertx.core.runtime.BufferOutputStream; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -43,6 +45,7 @@ final class VertxGrpcExporter implements SpanExporter { private static final String GRPC_MESSAGE = "grpc-message"; private static final Logger internalLogger = Logger.getLogger(VertxGrpcExporter.class.getName()); + private static final int MAX_ATTEMPTS = 3; private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); // TODO: is there something in JBoss Logging we can use? @@ -87,30 +90,35 @@ private CompletableResultCode export(TraceRequestMarshaler marshaler, int numIte exporterMetrics.addSeen(numItems); var result = new CompletableResultCode(); - var onSuccessHandler = new ClientRequestOnSuccessHandler(headers, compressionEnabled, exporterMetrics, marshaler, - loggedUnimplemented, logger, type, numItems, result); - client.request(server) - .onSuccess(onSuccessHandler) - .onFailure(new Handler<>() { - @Override - public void handle(Throwable t) { - // TODO: is there a better way todo retry? - // TODO: should we only retry on a specific errors? - - client.request(server) - .onSuccess(onSuccessHandler) - .onFailure(new Handler<>() { - @Override - public void handle(Throwable event) { - failOnClientRequest(numItems, t, result); - } - }); - } - }); + var onSuccessHandler = new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, exporterMetrics, + marshaler, + loggedUnimplemented, logger, type, numItems, result, 1); + + initiateSend(client, server, MAX_ATTEMPTS, onSuccessHandler, new Consumer<>() { + @Override + public void accept(Throwable throwable) { + failOnClientRequest(numItems, throwable, result); + } + }); return result; } + private static void initiateSend(GrpcClient client, SocketAddress server, + int numberOfAttempts, + Handler> onSuccessHandler, + Consumer onFailureCallback) { + Future> reqFuture = client.request(server); + Uni.createFrom().completionStage(reqFuture.toCompletionStage()).onFailure().retry().withBackOff(Duration.ofMillis(100)) + .atMost(numberOfAttempts).subscribe().with( + new Consumer<>() { + @Override + public void accept(GrpcClientRequest request) { + onSuccessHandler.handle(request); + } + }, onFailureCallback); + } + private void failOnClientRequest(int numItems, Throwable t, CompletableResultCode result) { exporterMetrics.addFailed(numItems); logger.log( @@ -146,6 +154,8 @@ public CompletableResultCode shutdown() { private static final class ClientRequestOnSuccessHandler implements Handler> { + private final GrpcClient client; + private final SocketAddress server; private final Map headers; private final boolean compressionEnabled; private final ExporterMetrics exporterMetrics; @@ -157,7 +167,11 @@ private static final class ClientRequestOnSuccessHandler implements Handler headers, + private final int attemptNumber; + + public ClientRequestOnSuccessHandler(GrpcClient client, + SocketAddress server, + Map headers, boolean compressionEnabled, ExporterMetrics exporterMetrics, TraceRequestMarshaler marshaler, @@ -165,7 +179,10 @@ public ClientRequestOnSuccessHandler(Map headers, ThrottlingLogger logger, String type, int numItems, - CompletableResultCode result) { + CompletableResultCode result, + int attemptNumber) { + this.client = client; + this.server = server; this.headers = headers; this.compressionEnabled = compressionEnabled; this.exporterMetrics = exporterMetrics; @@ -175,6 +192,7 @@ public ClientRequestOnSuccessHandler(Map headers, this.type = type; this.numItems = numItems; this.result = result; + this.attemptNumber = attemptNumber; } @Override @@ -205,14 +223,28 @@ public void handle(GrpcClientResponse response) { response.exceptionHandler(new Handler<>() { @Override public void handle(Throwable t) { - exporterMetrics.addFailed(numItems); - logger.log( - Level.SEVERE, - "Failed to export " - + type - + "s. The stream failed. Full error message: " - + t.getMessage()); - result.fail(); + if (attemptNumber <= MAX_ATTEMPTS) { + // retry + initiateSend(client, server, + MAX_ATTEMPTS - attemptNumber, + newAttempt(), + new Consumer<>() { + @Override + public void accept(Throwable throwable) { + failOnClientRequest(numItems, throwable, result); + } + }); + + } else { + exporterMetrics.addFailed(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The stream failed. Full error message: " + + t.getMessage()); + result.fail(); + } } }).errorHandler(new Handler<>() { @Override @@ -336,14 +368,27 @@ private String getStatusMessage(GrpcClientResponse response) { }).onFailure(new Handler<>() { @Override public void handle(Throwable t) { - exporterMetrics.addFailed(numItems); - logger.log( - Level.SEVERE, - "Failed to export " - + type - + "s. The request could not be executed. Full error message: " - + t.getMessage()); - result.fail(); + if (attemptNumber <= MAX_ATTEMPTS) { + // retry + initiateSend(client, server, + MAX_ATTEMPTS - attemptNumber, + newAttempt(), + new Consumer<>() { + @Override + public void accept(Throwable throwable) { + failOnClientRequest(numItems, throwable, result); + } + }); + } else { + exporterMetrics.addFailed(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The request could not be executed. Full error message: " + + t.getMessage()); + result.fail(); + } } }); } catch (IOException e) { @@ -357,5 +402,21 @@ public void handle(Throwable t) { result.fail(); } } + + private void failOnClientRequest(int numItems, Throwable t, CompletableResultCode result) { + exporterMetrics.addFailed(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The request could not be executed. Full error message: " + + t.getMessage()); + result.fail(); + } + + public ClientRequestOnSuccessHandler newAttempt() { + return new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, exporterMetrics, marshaler, + loggedUnimplemented, logger, type, numItems, result, attemptNumber + 1); + } } }