Skip to content

Commit

Permalink
Instantiate Jedis client lazily and only once per JVM process (#152)
Browse files Browse the repository at this point in the history
Signed-off-by: khorshuheng <khor.heng@gojek.com>

Co-authored-by: khorshuheng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed Jun 21, 2022
1 parent 311db46 commit 58ebe0f
Showing 1 changed file with 6 additions and 7 deletions.
Expand Up @@ -67,6 +67,12 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC

lazy val isClusterMode: Boolean = checkIfInClusterMode(endpoint)

lazy val pipelineProvider: PipelineProvider = if (isClusterMode) {
ClusterPipelineProvider(endpoint)
} else {
SingleNodePipelineProvider(newJedisClient(endpoint))
}

def newJedisClient(endpoint: RedisEndpoint): Jedis = {
val jedis = new Jedis(endpoint.host, endpoint.port)
if (endpoint.password.nonEmpty) {
Expand Down Expand Up @@ -95,12 +101,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
java.security.Security.setProperty("networkaddress.cache.ttl", "3");
java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0");

val pipelineProvider = if (isClusterMode) {
ClusterPipelineProvider(endpoint)
} else {
SingleNodePipelineProvider(newJedisClient(endpoint))
}

// grouped iterator to only allocate memory for a portion of rows
partition.grouped(properties.pipelineSize).foreach { batch =>
// group by key and keep only latest row per each key
Expand Down Expand Up @@ -146,7 +146,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
}
writePipeline.close()
}
pipelineProvider.close()
}
dataToStore.unpersist()
}
Expand Down

0 comments on commit 58ebe0f

Please sign in to comment.