Skip to content

Commit

Permalink
clean up Jedis connection after use (#116)
Browse files Browse the repository at this point in the history
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed Feb 23, 2022
1 parent 10d0e7d commit f4bb00c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ case class ClusterPipelineProvider(endpoint: RedisEndpoint) extends PipelineProv
*/
override def pipeline(): UnifiedPipeline = new ClusterPipeline(provider)

/**
* Close client connection
*/
override def close(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ trait PipelineProvider {
* @return an interface for executing pipeline commands
*/
def pipeline(): UnifiedPipeline

/**
* Close client connection
*/
def close(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package feast.ingestion.stores.redis

import java.{sql, util}
import com.google.protobuf.Timestamp
import com.google.protobuf.util.Timestamps
import feast.ingestion.utils.TypeConversion
import org.apache.commons.codec.digest.DigestUtils
import org.apache.spark.{SparkConf, SparkEnv}
Expand All @@ -30,7 +29,7 @@ import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import redis.clients.jedis.Jedis

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import scala.util.Try

/**
* High-level writer to Redis. Relies on `Persistence` implementation for actual storage layout.
Expand All @@ -54,6 +53,21 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC

val sparkConf: SparkConf = sqlContext.sparkContext.getConf

def newJedisClient(endpoint: RedisEndpoint): Jedis = {
val jedis = new Jedis(endpoint.host, endpoint.port)
if (endpoint.password.nonEmpty) {
jedis.auth(endpoint.password)
}
jedis
}

def checkIfInClusterMode(endpoint: RedisEndpoint): Boolean = {
val jedis = newJedisClient(endpoint)
val isCluster = Try(jedis.clusterInfo()).isSuccess
jedis.close()
isCluster
}

override def insert(data: DataFrame, overwrite: Boolean): Unit = {
// repartition for deduplication
val dataToStore =
Expand All @@ -63,21 +77,19 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
.localCheckpoint()
else data

val endpoint = RedisEndpoint(
host = sparkConf.get("spark.redis.host"),
port = sparkConf.get("spark.redis.port").toInt,
password = sparkConf.get("spark.redis.password", "")
)

val isClusterMode = checkIfInClusterMode(endpoint)

dataToStore.foreachPartition { partition: Iterator[Row] =>
val endpoint = RedisEndpoint(
host = sparkConf.get("spark.redis.host"),
port = sparkConf.get("spark.redis.port").toInt,
password = sparkConf.get("spark.redis.password", "")
)
val jedis = new Jedis(endpoint.host, endpoint.port)
if (endpoint.password.nonEmpty) {
jedis.auth(endpoint.password)
}
val pipelineProvider = Try(jedis.clusterInfo()) match {
case Success(_) =>
ClusterPipelineProvider(endpoint)
case Failure(_) =>
SingleNodePipelineProvider(jedis)
val pipelineProvider = if (isClusterMode) {
ClusterPipelineProvider(endpoint)
} else {
SingleNodePipelineProvider(newJedisClient(endpoint))
}

// grouped iterator to only allocate memory for a portion of rows
Expand Down Expand Up @@ -125,6 +137,7 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
}
writePipeline.close()
}
pipelineProvider.close()
}
dataToStore.unpersist()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ case class SingleNodePipelineProvider(jedis: Jedis) extends PipelineProvider {
*/
override def pipeline(): UnifiedPipeline = jedis.pipelined()

/**
* Close client connection
*/
override def close(): Unit = jedis.close()
}

0 comments on commit f4bb00c

Please sign in to comment.