From 58ebe0f0089457e5f74604165d5a57fb21cff383 Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Tue, 21 Jun 2022 13:52:25 +0800 Subject: [PATCH] Instantiate Jedis client lazily and only once per JVM process (#152) Signed-off-by: khorshuheng Co-authored-by: khorshuheng --- .../ingestion/stores/redis/RedisSinkRelation.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala index 81ddc1e..807480e 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala @@ -67,6 +67,12 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC lazy val isClusterMode: Boolean = checkIfInClusterMode(endpoint) + lazy val pipelineProvider: PipelineProvider = if (isClusterMode) { + ClusterPipelineProvider(endpoint) + } else { + SingleNodePipelineProvider(newJedisClient(endpoint)) + } + def newJedisClient(endpoint: RedisEndpoint): Jedis = { val jedis = new Jedis(endpoint.host, endpoint.port) if (endpoint.password.nonEmpty) { @@ -95,12 +101,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC java.security.Security.setProperty("networkaddress.cache.ttl", "3"); java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0"); - val pipelineProvider = if (isClusterMode) { - ClusterPipelineProvider(endpoint) - } else { - SingleNodePipelineProvider(newJedisClient(endpoint)) - } - // grouped iterator to only allocate memory for a portion of rows partition.grouped(properties.pipelineSize).foreach { batch => // group by key and keep only latest row per each key @@ -146,7 +146,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC } writePipeline.close() } - pipelineProvider.close() } dataToStore.unpersist() }