diff --git a/protos/feast/core/Store.proto b/protos/feast/core/Store.proto index 41a76a11c2c..58d02874cec 100644 --- a/protos/feast/core/Store.proto +++ b/protos/feast/core/Store.proto @@ -21,6 +21,8 @@ option java_package = "feast.proto.core"; option java_outer_classname = "StoreProto"; option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; +import "google/protobuf/duration.proto"; + // Store provides a location where Feast reads and writes feature values. // Feature values will be written to the Store in the form of FeatureRow elements. // The way FeatureRow is encoded and decoded when it is written to and read from @@ -86,6 +88,8 @@ message Store { REPLICA_PREFERRED = 3; } ReadFrom read_from = 8; + // Optional. Timeout on waiting response from redis node + google.protobuf.Duration timeout = 9; } message Subscription { diff --git a/serving/pom.xml b/serving/pom.xml index b8f675dd305..a47a33ef33e 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -89,7 +89,13 @@ feast-common ${project.version} - + + + io.lettuce + lettuce-core + 6.0.2.RELEASE + + org.slf4j @@ -197,6 +203,7 @@ simpleclient_servlet 0.8.0 + io.prometheus simpleclient_spring_boot diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index 288ec7eb972..536f4fb02a7 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -56,6 +56,7 @@ feast: # Connection string specifies the host:port of Redis instances in the redis cluster. connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005" read_from: MASTER + timeout: 0.5s subscriptions: - name: "*" project: "*" diff --git a/storage/connectors/redis/pom.xml b/storage/connectors/redis/pom.xml index bbda8dab27f..f65cbd0f96b 100644 --- a/storage/connectors/redis/pom.xml +++ b/storage/connectors/redis/pom.xml @@ -16,6 +16,14 @@ io.lettuce lettuce-core + 6.0.2.RELEASE + + + + io.netty + netty-transport-native-epoll + 4.1.52.Final + linux-x86_64 diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java index aeb8220b0cb..c1184865b7f 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java @@ -21,13 +21,13 @@ import feast.proto.core.StoreProto.Store.RedisClusterConfig; import feast.storage.connectors.redis.serializer.RedisKeyPrefixSerializerV2; import feast.storage.connectors.redis.serializer.RedisKeySerializerV2; -import io.lettuce.core.KeyValue; -import io.lettuce.core.ReadFrom; -import io.lettuce.core.RedisFuture; -import io.lettuce.core.RedisURI; +import io.lettuce.core.*; +import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; import io.lettuce.core.codec.ByteArrayCodec; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -100,9 +100,28 @@ public static RedisClientAdapter create(StoreProto.Store.RedisClusterConfig conf return RedisURI.create(hostPortSplit[0], Integer.parseInt(hostPortSplit[1])); }) .collect(Collectors.toList()); + io.lettuce.core.cluster.RedisClusterClient client = + io.lettuce.core.cluster.RedisClusterClient.create(redisURIList); + + Duration timeout; + if (config.hasTimeout()) { + timeout = + Duration.ofSeconds(config.getTimeout().getSeconds(), config.getTimeout().getNanos()); + } else { + timeout = Duration.ofSeconds(10); + } + + client.setOptions( + ClusterClientOptions.builder() + .socketOptions(SocketOptions.builder().keepAlive(true).tcpNoDelay(true).build()) + .timeoutOptions(TimeoutOptions.enabled(timeout)) + .pingBeforeActivateConnection(true) + .topologyRefreshOptions( + ClusterTopologyRefreshOptions.builder().enableAllAdaptiveRefreshTriggers().build()) + .build()); + StatefulRedisClusterConnection connection = - io.lettuce.core.cluster.RedisClusterClient.create(redisURIList) - .connect(new ByteArrayCodec()); + client.connect(new ByteArrayCodec()); connection.setReadFrom(PROTO_TO_LETTUCE_TYPES.get(config.getReadFrom()));