From dfe3d61fa90c0385d5e10737272db47e0fd65e22 Mon Sep 17 00:00:00 2001 From: alanroch Date: Thu, 14 Mar 2019 16:08:09 +0000 Subject: [PATCH 1/4] Issue#140 Add support for SSL https://github.com/RedisLabs/spark-redis/issues/140 --- .../com/redislabs/provider/redis/ConnectionPool.scala | 2 +- .../com/redislabs/provider/redis/RedisConfig.scala | 10 +++++++--- .../scala/com/redislabs/provider/redis/package.scala | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala index 22bfe897..f5322f29 100644 --- a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala +++ b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala @@ -23,7 +23,7 @@ object ConnectionPool { poolConfig.setMinEvictableIdleTimeMillis(60000) poolConfig.setTimeBetweenEvictionRunsMillis(30000) poolConfig.setNumTestsPerEvictionRun(-1) - new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum) + new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl) } ) var sleepTime: Int = 4 diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala index fd47c097..b4015ab1 100644 --- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala +++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala @@ -17,12 +17,14 @@ import scala.collection.JavaConversions._ * @param port the redis port * @param auth the authentication password * @param dbNum database number (should be avoided in general) + * @param ssl true to enable SSL connection. Defaults to false */ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, port: Int = Protocol.DEFAULT_PORT, auth: String = null, dbNum: Int = Protocol.DEFAULT_DATABASE, - timeout: Int = Protocol.DEFAULT_TIMEOUT) + timeout: Int = Protocol.DEFAULT_TIMEOUT, + ssl: Boolean = false) extends Serializable { /** @@ -37,7 +39,8 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, conf.getInt("spark.redis.port", Protocol.DEFAULT_PORT), conf.get("spark.redis.auth", null), conf.getInt("spark.redis.db", Protocol.DEFAULT_DATABASE), - conf.getInt("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT) + conf.getInt("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT), + conf.getBoolean("spark.redis.ssl", false) ) } @@ -47,7 +50,8 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, * @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum] */ def this(uri: URI) { - this(uri.getHost, uri.getPort, JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri)) + this(uri.getHost, uri.getPort, JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri), + Protocol.DEFAULT_TIMEOUT, uri.getScheme == RedisSslScheme) } /** diff --git a/src/main/scala/com/redislabs/provider/redis/package.scala b/src/main/scala/com/redislabs/provider/redis/package.scala index 260d5590..1f1ffce4 100644 --- a/src/main/scala/com/redislabs/provider/redis/package.scala +++ b/src/main/scala/com/redislabs/provider/redis/package.scala @@ -1,7 +1,7 @@ package com.redislabs.provider package object redis extends RedisFunctions { - + val RedisSslScheme: String = "rediss" val RedisDataTypeHash: String = "hash" val RedisDataTypeString: String = "string" } From 188bcbb9abb48e466cc45310d791a6613ffcfbc5 Mon Sep 17 00:00:00 2001 From: alanroch Date: Thu, 14 Mar 2019 16:40:55 +0000 Subject: [PATCH 2/4] Issue#140 Update scaladoc https://github.com/RedisLabs/spark-redis/issues/140 --- .../scala/com/redislabs/provider/redis/RedisConfig.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala index b4015ab1..a1d0e5a1 100644 --- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala +++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala @@ -11,7 +11,7 @@ import scala.collection.JavaConversions._ /** * RedisEndpoint represents a redis connection endpoint info: host, port, auth password - * db number, and timeout + * db number, timeout and ssl mode * * @param host the redis host or ip * @param port the redis port @@ -28,8 +28,7 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, extends Serializable { /** - * Constructor from spark config. set params with spark.redis.host, spark.redis.port, spark.redis.auth and - * spark.redis.db + * Constructor from spark config. set params with spark.redis.host, spark.redis.port, spark.redis.auth, spark.redis.db and spark.redis.ssl * * @param conf spark context config */ @@ -47,7 +46,7 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, /** * Constructor with Jedis URI * - * @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum] + * @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL */ def this(uri: URI) { this(uri.getHost, uri.getPort, JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri), @@ -57,7 +56,7 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, /** * Constructor with Jedis URI from String * - * @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum] + * @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL */ def this(uri: String) { this(URI.create(uri)) From 3b03b04fe9a73a36d84a23f3cf3d41c378b11f54 Mon Sep 17 00:00:00 2001 From: Christian Date: Mon, 13 Apr 2020 15:45:29 -0400 Subject: [PATCH 3/4] Add ssl params to all invocations of RedisEndpoint ctor. --- .../scala/com/redislabs/provider/redis/ConnectionPool.scala | 2 ++ .../scala/com/redislabs/provider/redis/RedisConfig.scala | 6 +++--- .../org/apache/spark/sql/redis/RedisSourceRelation.scala | 3 ++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala index f5322f29..322d8c5f 100644 --- a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala +++ b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala @@ -11,6 +11,7 @@ import scala.collection.JavaConversions._ object ConnectionPool { @transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, JedisPool] = new ConcurrentHashMap[RedisEndpoint, JedisPool]() + def connect(re: RedisEndpoint): Jedis = { val pool = pools.getOrElseUpdate(re, { @@ -23,6 +24,7 @@ object ConnectionPool { poolConfig.setMinEvictableIdleTimeMillis(60000) poolConfig.setTimeBetweenEvictionRunsMillis(30000) poolConfig.setNumTestsPerEvictionRun(-1) + new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl) } ) diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala index a1d0e5a1..91e2f05e 100644 --- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala +++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala @@ -254,7 +254,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { //simply re-enter this function witht he master host/port getNonClusterNodes(initialHost = new RedisEndpoint(host, port, - initialHost.auth, initialHost.dbNum)) + initialHost.auth, initialHost.dbNum, ssl = initialHost.ssl)) } else { //this is a master - take its slaves @@ -270,7 +270,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val range = nodes.length (0 until range).map(i => RedisNode(RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum, - initialHost.timeout), + initialHost.timeout, initialHost.ssl), 0, 16383, i, range)).toArray } } @@ -300,7 +300,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]]) val port = node.get(1).toString.toInt RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum, - initialHost.timeout), + initialHost.timeout, initialHost.ssl), sPos, ePos, i, diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala index 245e2967..f2c84911 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala @@ -40,7 +40,8 @@ class RedisSourceRelation(override val sqlContext: SQLContext, val auth = parameters.getOrElse("auth", null) val dbNum = parameters.get("dbNum").map(_.toInt).getOrElse(Protocol.DEFAULT_DATABASE) val timeout = parameters.get("timeout").map(_.toInt).getOrElse(Protocol.DEFAULT_TIMEOUT) - RedisEndpoint(host, port, auth, dbNum, timeout) + val ssl = parameters.get("ssl").map(_.toBoolean).getOrElse(false) + RedisEndpoint(host, port, auth, dbNum, timeout, ssl) } ) } From acef32a263e811f4d158cdbf092ea0b5980681a4 Mon Sep 17 00:00:00 2001 From: Christian Date: Fri, 24 Apr 2020 21:26:04 -0400 Subject: [PATCH 4/4] Add spark.redis.ssl to configuration doc. --- doc/configuration.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/configuration.md b/doc/configuration.md index 9a20b1c0..353f4e47 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -10,6 +10,7 @@ topology from the initial node, so there is no need to provide the rest of the c * `spark.redis.timeout` - connection timeout in ms, 2000 ms by default * `spark.redis.max.pipeline.size` - the maximum number of commands per pipeline (used to batch commands). The default value is 100. * `spark.redis.scan.count` - count option of SCAN command (used to iterate over keys). The default value is 100. +* `spark.redis.ssl` - set to true to use tls