From 40d8b246a101e4fd6a667b24e62ea0f0caa3ea11 Mon Sep 17 00:00:00 2001 From: Jose Acevedo Date: Wed, 29 May 2024 16:35:10 -0500 Subject: [PATCH] fix: Make Java gRPC client use timeouts as expected Signed-off-by: Jose Acevedo --- .../src/main/java/dev/feast/FeastClient.java | 38 +++++++++++-------- .../test/java/dev/feast/FeastClientTest.java | 5 +-- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/java/serving-client/src/main/java/dev/feast/FeastClient.java b/java/serving-client/src/main/java/dev/feast/FeastClient.java index b2767e3c90..c14d3be586 100644 --- a/java/serving-client/src/main/java/dev/feast/FeastClient.java +++ b/java/serving-client/src/main/java/dev/feast/FeastClient.java @@ -26,7 +26,6 @@ import feast.proto.serving.ServingServiceGrpc.ServingServiceBlockingStub; import feast.proto.types.ValueProto; import io.grpc.CallCredentials; -import io.grpc.Deadline; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; @@ -50,6 +49,7 @@ public class FeastClient implements AutoCloseable { private final ManagedChannel channel; private final ServingServiceBlockingStub stub; + private final long requestTimeout; /** * Create a client to access Feast Serving. @@ -68,12 +68,14 @@ public static FeastClient create(String host, int port) { * * @param host hostname or ip address of Feast serving GRPC server * @param port port number of Feast serving GRPC server - * @param deadline GRPC deadline of Feast serving GRPC server {@link Deadline} + * @param requestTimeout maximum duration for online retrievals from the GRPC server in + * milliseconds, use 0 for no timeout * @return {@link FeastClient} */ - public static FeastClient create(String host, int port, Deadline deadline) { + public static FeastClient create(String host, int port, long requestTimeout) { // configure client with no security config. - return FeastClient.createSecure(host, port, SecurityConfig.newBuilder().build(), deadline); + return FeastClient.createSecure( + host, port, SecurityConfig.newBuilder().build(), requestTimeout); } /** @@ -86,7 +88,7 @@ public static FeastClient create(String host, int port, Deadline deadline) { * @return {@link FeastClient} */ public static FeastClient createSecure(String host, int port, SecurityConfig securityConfig) { - return createSecure(host, port, securityConfig, null); + return FeastClient.createSecure(host, port, securityConfig, 0); } /** @@ -96,11 +98,17 @@ public static FeastClient createSecure(String host, int port, SecurityConfig sec * @param port port number of Feast serving GRPC server * @param securityConfig security options to configure the Feast client. See {@link * SecurityConfig} for options. - * @param deadline GRPC deadline of Feast serving GRPC server {@link Deadline} + * @param requestTimeout maximum duration for online retrievals from the GRPC server in + * milliseconds * @return {@link FeastClient} */ public static FeastClient createSecure( - String host, int port, SecurityConfig securityConfig, Deadline deadline) { + String host, int port, SecurityConfig securityConfig, long requestTimeout) { + + if (requestTimeout < 0) { + throw new IllegalArgumentException("Request timeout can't be negative"); + } + // Configure client TLS ManagedChannel channel = null; if (securityConfig.isTLSEnabled()) { @@ -127,7 +135,7 @@ public static FeastClient createSecure( channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); } - return new FeastClient(channel, securityConfig.getCredentials(), Optional.ofNullable(deadline)); + return new FeastClient(channel, securityConfig.getCredentials(), requestTimeout); } /** @@ -158,7 +166,10 @@ public List getOnlineFeatures(List featureRefs, List entities) requestBuilder.putAllEntities(getEntityValuesMap(entities)); - GetOnlineFeaturesResponse response = stub.getOnlineFeatures(requestBuilder.build()); + ServingServiceGrpc.ServingServiceBlockingStub timedStub = + requestTimeout != 0 ? stub.withDeadlineAfter(requestTimeout, TimeUnit.MILLISECONDS) : stub; + + GetOnlineFeaturesResponse response = timedStub.getOnlineFeatures(requestBuilder.build()); List results = Lists.newArrayList(); if (response.getResultsCount() == 0) { @@ -231,12 +242,13 @@ public List getOnlineFeatures(List featureRefs, List rows, Str } protected FeastClient(ManagedChannel channel, Optional credentials) { - this(channel, credentials, Optional.empty()); + this(channel, credentials, 0); } protected FeastClient( - ManagedChannel channel, Optional credentials, Optional deadline) { + ManagedChannel channel, Optional credentials, long requestTimeout) { this.channel = channel; + this.requestTimeout = requestTimeout; TracingClientInterceptor tracingInterceptor = TracingClientInterceptor.newBuilder().withTracer(GlobalTracer.get()).build(); @@ -247,10 +259,6 @@ protected FeastClient( servingStub = servingStub.withCallCredentials(credentials.get()); } - if (deadline.isPresent()) { - servingStub = servingStub.withDeadline(deadline.get()); - } - this.stub = servingStub; } diff --git a/java/serving-client/src/test/java/dev/feast/FeastClientTest.java b/java/serving-client/src/test/java/dev/feast/FeastClientTest.java index 9846cff9f7..cbd4b0016e 100644 --- a/java/serving-client/src/test/java/dev/feast/FeastClientTest.java +++ b/java/serving-client/src/test/java/dev/feast/FeastClientTest.java @@ -38,7 +38,6 @@ import java.util.HashMap; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; import org.junit.Rule; @@ -46,7 +45,7 @@ public class FeastClientTest { private final String AUTH_TOKEN = "test token"; - private final Deadline DEADLINE = Deadline.after(2, TimeUnit.SECONDS); + private final long TIMEOUT_MILLIS = 300; @Rule public GrpcCleanupRule grpcRule; private AtomicBoolean isAuthenticated; @@ -88,7 +87,7 @@ public void setup() throws Exception { ManagedChannel channel = this.grpcRule.register( InProcessChannelBuilder.forName(serverName).directExecutor().build()); - this.client = new FeastClient(channel, Optional.empty(), Optional.of(DEADLINE)); + this.client = new FeastClient(channel, Optional.empty(), TIMEOUT_MILLIS); } @Test