Skip to content

Commit

Permalink
Uses a separate virtual thread to check if the connection to the serv…
Browse files Browse the repository at this point in the history
…er has died. The period to check for that is configurable and set to 5 seconds by default.
  • Loading branch information
spericas committed May 20, 2024
1 parent 2256e17 commit 3e19765
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ abstract class GrpcBaseClientCall<ReqT, ResT> extends ClientCall<ReqT, ResT> {
protected static final Header GRPC_ACCEPT_ENCODING = HeaderValues.create(HeaderNames.ACCEPT_ENCODING, "gzip");
protected static final Header GRPC_CONTENT_TYPE = HeaderValues.create(HeaderNames.CONTENT_TYPE, "application/grpc");

protected static final BufferData PING_FRAME = BufferData.create("PING");
protected static final BufferData EMPTY_BUFFER_DATA = BufferData.empty();

private final GrpcClientImpl grpcClient;
Expand All @@ -70,6 +71,7 @@ abstract class GrpcBaseClientCall<ReqT, ResT> extends ClientCall<ReqT, ResT> {
private final int initBufferSize;
private final Duration pollWaitTime;
private final boolean abortPollTimeExpired;
private final Duration heartbeatPeriod;

private final MethodDescriptor.Marshaller<ReqT> requestMarshaller;
private final MethodDescriptor.Marshaller<ResT> responseMarshaller;
Expand All @@ -88,6 +90,11 @@ abstract class GrpcBaseClientCall<ReqT, ResT> extends ClientCall<ReqT, ResT> {
this.initBufferSize = grpcClient.prototype().protocolConfig().initBufferSize();
this.pollWaitTime = grpcClient.prototype().protocolConfig().pollWaitTime();
this.abortPollTimeExpired = grpcClient.prototype().protocolConfig().abortPollTimeExpired();
this.heartbeatPeriod = grpcClient.prototype().protocolConfig().heartbeatPeriod();
}

protected Duration heartbeatPeriod() {
return heartbeatPeriod;
}

protected boolean abortPollTimeExpired() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.helidon.webclient.grpc;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -56,6 +58,7 @@ class GrpcClientCall<ReqT, ResT> extends GrpcBaseClientCall<ReqT, ResT> {

private volatile Future<?> readStreamFuture;
private volatile Future<?> writeStreamFuture;
private volatile Future<?> heartbeatFuture;

GrpcClientCall(GrpcClientImpl grpcClient, MethodDescriptor<ReqT, ResT> methodDescriptor, CallOptions callOptions) {
super(grpcClient, methodDescriptor, callOptions);
Expand All @@ -75,6 +78,7 @@ public void cancel(String message, Throwable cause) {
responseListener().onClose(Status.CANCELLED, EMPTY_METADATA);
readStreamFuture.cancel(true);
writeStreamFuture.cancel(true);
heartbeatFuture.cancel(true);
close();
}

Expand All @@ -100,6 +104,29 @@ public void sendMessage(ReqT message) {
}

protected void startStreamingThreads() {
// heartbeat thread
Duration period = heartbeatPeriod();
if (!period.isZero()) {
heartbeatFuture = executor.submit(() -> {
try {
startWriteBarrier.await();
socket().log(LOGGER, DEBUG, "[Heartbeat thread] started with period " + period);

while (isRemoteOpen()) {
Thread.sleep(period);
if (sendingQueue.isEmpty()) {
sendingQueue.add(PING_FRAME);
socket().log(LOGGER, DEBUG, "[Heartbeat thread] heartbeat queued");
}
}
} catch (Throwable t) {
socket().log(LOGGER, DEBUG, "[Heartbeat thread] exception " + t.getMessage());
}
});
} else {
heartbeatFuture = CompletableFuture.completedFuture(null);
}

// write streaming thread
writeStreamFuture = executor.submit(() -> {
try {
Expand All @@ -111,7 +138,11 @@ protected void startStreamingThreads() {
socket().log(LOGGER, DEBUG, "[Writing thread] polling sending queue");
BufferData bufferData = sendingQueue.poll(pollWaitTime().toMillis(), TimeUnit.MILLISECONDS);
if (bufferData != null) {
if (bufferData == EMPTY_BUFFER_DATA) { // end marker
if (bufferData == PING_FRAME) { // ping frame
clientStream().sendPing();
continue;
}
if (bufferData == EMPTY_BUFFER_DATA) { // end marker
socket().log(LOGGER, DEBUG, "[Writing thread] sending queue end marker found");
if (!endOfStream) {
socket().log(LOGGER, DEBUG, "[Writing thread] sending empty buffer to end stream");
Expand Down Expand Up @@ -208,24 +239,11 @@ private void drainReceivingQueue() {
}
}

/**
* Handles a read timeout by either aborting or continuing, depending on how the client
* is configured. If not aborting, it will attempt to send a PING frame to check the
* connection health before attempting to proceed.
*
* @param e a stream timeout exception
*/
private void handleStreamTimeout(StreamTimeoutException e) {
// abort or retry based on config settings
if (abortPollTimeExpired()) {
socket().log(LOGGER, ERROR, "[Reading thread] HTTP/2 stream timeout, aborting");
throw e; // caught below
}

// check connection health before proceeding
clientStream().sendPing();

// log and continue if ping did not throw exception
socket().log(LOGGER, ERROR, "[Reading thread] HTTP/2 stream timeout, retrying");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ default String type() {
/**
* How long to wait for the next HTTP/2 data frame to arrive in underlying stream.
* Whether this is a fatal error or not is controlled by {@link #abortPollTimeExpired()}.
* If {@link #abortPollTimeExpired()} is set to {@code false}, the connection
* health will first be verified by attempting to send a PING frame before
* attempting a new read.
*
* @return poll time as a duration
* @see io.helidon.common.socket.SocketOptions#readTimeout()
Expand All @@ -74,6 +71,16 @@ default String type() {
@Option.Default("false")
boolean abortPollTimeExpired();

/**
* How often to send a heartbeat (HTTP/2 ping) to check if the connection is still
* alive. Set the heartbeat to 0 to turn this feature off.
*
* @return heartbeat period
*/
@Option.Configured
@Option.Default("PT5S")
Duration heartbeatPeriod();

/**
* Initial buffer size used to serialize gRPC request payloads. Buffers shall grow
* according to the payload size, but setting this initial buffer size to a larger value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*/
@ServerTest
class GrpcConnectionErrorTest extends GrpcBaseTest {
private static final long TIMEOUT_SECONDS = 1000;
private static final long TIMEOUT_SECONDS = 10;

private final WebServer server;
private final GrpcClient grpcClient;
Expand All @@ -54,8 +54,7 @@ private GrpcConnectionErrorTest(WebServer server) {
.keystore(Resource.create("client.p12"))))
.build();
GrpcClientProtocolConfig config = GrpcClientProtocolConfig.builder()
.pollWaitTime(Duration.ofSeconds(2)) // detects connection issues
.abortPollTimeExpired(false) // checks health with PING
.heartbeatPeriod(Duration.ofSeconds(1)) // detects failure faster
.build();
this.grpcClient = GrpcClient.builder()
.tls(clientTls)
Expand Down
2 changes: 1 addition & 1 deletion webclient/tests/grpc/src/test/resources/logging.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ handlers=io.helidon.logging.jul.HelidonConsoleHandler
java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n

.level=INFO
io.helidon.webclient.grpc.level=FINEST
#io.helidon.webclient.grpc.level=FINEST

0 comments on commit 3e19765

Please sign in to comment.