Skip to content

Commit

Permalink
Make VertxGrpcExporter more robust
Browse files Browse the repository at this point in the history
Introduce retry when a network related error occurs

Relates to: quarkusio#35686

(cherry picked from commit 1fe7bad)
  • Loading branch information
geoand authored and gsmet committed Feb 26, 2024
1 parent 975dbe1 commit 1a53bdb
Showing 1 changed file with 99 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.quarkus.vertx.core.runtime.BufferOutputStream;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
Expand All @@ -43,6 +45,7 @@ final class VertxGrpcExporter implements SpanExporter {
private static final String GRPC_MESSAGE = "grpc-message";

private static final Logger internalLogger = Logger.getLogger(VertxGrpcExporter.class.getName());
private static final int MAX_ATTEMPTS = 3;

private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); // TODO: is there something in JBoss Logging we can use?

Expand Down Expand Up @@ -87,30 +90,35 @@ private CompletableResultCode export(TraceRequestMarshaler marshaler, int numIte
exporterMetrics.addSeen(numItems);

var result = new CompletableResultCode();
var onSuccessHandler = new ClientRequestOnSuccessHandler(headers, compressionEnabled, exporterMetrics, marshaler,
loggedUnimplemented, logger, type, numItems, result);
client.request(server)
.onSuccess(onSuccessHandler)
.onFailure(new Handler<>() {
@Override
public void handle(Throwable t) {
// TODO: is there a better way todo retry?
// TODO: should we only retry on a specific errors?

client.request(server)
.onSuccess(onSuccessHandler)
.onFailure(new Handler<>() {
@Override
public void handle(Throwable event) {
failOnClientRequest(numItems, t, result);
}
});
}
});
var onSuccessHandler = new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, exporterMetrics,
marshaler,
loggedUnimplemented, logger, type, numItems, result, 1);

initiateSend(client, server, MAX_ATTEMPTS, onSuccessHandler, new Consumer<>() {
@Override
public void accept(Throwable throwable) {
failOnClientRequest(numItems, throwable, result);
}
});

return result;
}

private static void initiateSend(GrpcClient client, SocketAddress server,
int numberOfAttempts,
Handler<GrpcClientRequest<Buffer, Buffer>> onSuccessHandler,
Consumer<Throwable> onFailureCallback) {
Future<GrpcClientRequest<Buffer, Buffer>> reqFuture = client.request(server);
Uni.createFrom().completionStage(reqFuture.toCompletionStage()).onFailure().retry().withBackOff(Duration.ofMillis(100))
.atMost(numberOfAttempts).subscribe().with(
new Consumer<>() {
@Override
public void accept(GrpcClientRequest<Buffer, Buffer> request) {
onSuccessHandler.handle(request);
}
}, onFailureCallback);
}

private void failOnClientRequest(int numItems, Throwable t, CompletableResultCode result) {
exporterMetrics.addFailed(numItems);
logger.log(
Expand Down Expand Up @@ -146,6 +154,8 @@ public CompletableResultCode shutdown() {

private static final class ClientRequestOnSuccessHandler implements Handler<GrpcClientRequest<Buffer, Buffer>> {

private final GrpcClient client;
private final SocketAddress server;
private final Map<String, String> headers;
private final boolean compressionEnabled;
private final ExporterMetrics exporterMetrics;
Expand All @@ -157,15 +167,22 @@ private static final class ClientRequestOnSuccessHandler implements Handler<Grpc
private final int numItems;
private final CompletableResultCode result;

public ClientRequestOnSuccessHandler(Map<String, String> headers,
private final int attemptNumber;

public ClientRequestOnSuccessHandler(GrpcClient client,
SocketAddress server,
Map<String, String> headers,
boolean compressionEnabled,
ExporterMetrics exporterMetrics,
TraceRequestMarshaler marshaler,
AtomicBoolean loggedUnimplemented,
ThrottlingLogger logger,
String type,
int numItems,
CompletableResultCode result) {
CompletableResultCode result,
int attemptNumber) {
this.client = client;
this.server = server;
this.headers = headers;
this.compressionEnabled = compressionEnabled;
this.exporterMetrics = exporterMetrics;
Expand All @@ -175,6 +192,7 @@ public ClientRequestOnSuccessHandler(Map<String, String> headers,
this.type = type;
this.numItems = numItems;
this.result = result;
this.attemptNumber = attemptNumber;
}

@Override
Expand Down Expand Up @@ -205,14 +223,28 @@ public void handle(GrpcClientResponse<Buffer, Buffer> response) {
response.exceptionHandler(new Handler<>() {
@Override
public void handle(Throwable t) {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The stream failed. Full error message: "
+ t.getMessage());
result.fail();
if (attemptNumber <= MAX_ATTEMPTS) {
// retry
initiateSend(client, server,
MAX_ATTEMPTS - attemptNumber,
newAttempt(),
new Consumer<>() {
@Override
public void accept(Throwable throwable) {
failOnClientRequest(numItems, throwable, result);
}
});

} else {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The stream failed. Full error message: "
+ t.getMessage());
result.fail();
}
}
}).errorHandler(new Handler<>() {
@Override
Expand Down Expand Up @@ -336,14 +368,27 @@ private String getStatusMessage(GrpcClientResponse<Buffer, Buffer> response) {
}).onFailure(new Handler<>() {
@Override
public void handle(Throwable t) {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Full error message: "
+ t.getMessage());
result.fail();
if (attemptNumber <= MAX_ATTEMPTS) {
// retry
initiateSend(client, server,
MAX_ATTEMPTS - attemptNumber,
newAttempt(),
new Consumer<>() {
@Override
public void accept(Throwable throwable) {
failOnClientRequest(numItems, throwable, result);
}
});
} else {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Full error message: "
+ t.getMessage());
result.fail();
}
}
});
} catch (IOException e) {
Expand All @@ -357,5 +402,21 @@ public void handle(Throwable t) {
result.fail();
}
}

private void failOnClientRequest(int numItems, Throwable t, CompletableResultCode result) {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Full error message: "
+ t.getMessage());
result.fail();
}

public ClientRequestOnSuccessHandler newAttempt() {
return new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, exporterMetrics, marshaler,
loggedUnimplemented, logger, type, numItems, result, attemptNumber + 1);
}
}
}

0 comments on commit 1a53bdb

Please sign in to comment.