From 484f0329cb51844620c08306a3a17d9dfd34796d Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Mon, 25 Jan 2021 12:17:08 +0100 Subject: [PATCH 1/2] Allow to override Redis options for spark.readStream It's possible to override the Redis options for specific `spark.read` operation, but it's impossible to do for the `spark.readStream`. This PR allows to do that, unifying the implementation of doing that. --- .../provider/redis/RedisConfig.scala | 22 +++++++++++++++++++ .../spark/sql/redis/RedisSourceRelation.scala | 17 ++------------ .../spark/sql/redis/stream/RedisSource.scala | 2 +- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala index 91e2f05e..7b6d239e 100644 --- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala +++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala @@ -43,6 +43,24 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, ) } + + /** + * Constructor from spark config and parameters. + * + * @param conf spark context config + * @param parameters source specific parameters + */ + def this(conf: SparkConf, parameters: Map[String, String]) { + this( + parameters.getOrElse("host", conf.get("spark.redis.host", Protocol.DEFAULT_HOST)), + parameters.getOrElse("port", conf.get("spark.redis.port", Protocol.DEFAULT_PORT.toString)).toInt, + parameters.getOrElse("auth", conf.get("spark.redis.auth", null)), + parameters.getOrElse("dbNum", conf.get("spark.redis.db", Protocol.DEFAULT_DATABASE.toString)).toInt, + parameters.getOrElse("timeout", conf.get("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT.toString)).toInt, + parameters.getOrElse("ssl", conf.get("spark.redis.ssl", false.toString)).toBoolean) + } + + /** * Constructor with Jedis URI * @@ -122,6 +140,10 @@ object RedisConfig { def fromSparkConf(conf: SparkConf): RedisConfig = { new RedisConfig(new RedisEndpoint(conf)) } + + def fromSparkConfAndParameters(conf: SparkConf, parameters: Map[String, String]): RedisConfig = { + new RedisConfig(new RedisEndpoint(conf, parameters)) + } } /** diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala index 06487471..3553f851 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala @@ -30,21 +30,8 @@ class RedisSourceRelation(override val sqlContext: SQLContext, with Serializable with Logging { - private implicit val redisConfig: RedisConfig = { - new RedisConfig( - if ((parameters.keySet & Set("host", "port", "auth", "dbNum", "timeout")).isEmpty) { - new RedisEndpoint(sqlContext.sparkContext.getConf) - } else { - val host = parameters.getOrElse("host", Protocol.DEFAULT_HOST) - val port = parameters.get("port").map(_.toInt).getOrElse(Protocol.DEFAULT_PORT) - val auth = parameters.getOrElse("auth", null) - val dbNum = parameters.get("dbNum").map(_.toInt).getOrElse(Protocol.DEFAULT_DATABASE) - val timeout = parameters.get("timeout").map(_.toInt).getOrElse(Protocol.DEFAULT_TIMEOUT) - val ssl = parameters.get("ssl").map(_.toBoolean).getOrElse(false) - RedisEndpoint(host, port, auth, dbNum, timeout, ssl) - } - ) - } + private implicit val redisConfig: RedisConfig = RedisConfig.fromSparkConfAndParameters( + sqlContext.sparkContext.getConf, parameters) implicit private val readWriteConfig: ReadWriteConfig = { val global = ReadWriteConfig.fromSparkConf(sqlContext.sparkContext.getConf) diff --git a/src/main/scala/org/apache/spark/sql/redis/stream/RedisSource.scala b/src/main/scala/org/apache/spark/sql/redis/stream/RedisSource.scala index 4a39a3ca..f1f8e8a6 100644 --- a/src/main/scala/org/apache/spark/sql/redis/stream/RedisSource.scala +++ b/src/main/scala/org/apache/spark/sql/redis/stream/RedisSource.scala @@ -25,7 +25,7 @@ class RedisSource(sqlContext: SQLContext, metadataPath: String, private val sc = sqlContext.sparkContext - implicit private val redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf) + implicit private val redisConfig: RedisConfig = RedisConfig.fromSparkConfAndParameters(sc.getConf, parameters) private val sourceConfig = RedisSourceConfig.fromMap(parameters) From c2f3533e3ff78fd3d3c3ef128cfbba874f9e44e0 Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Tue, 26 Jan 2021 08:18:47 +0100 Subject: [PATCH 2/2] document connection-level options for spark.readStream --- doc/dataframe.md | 10 +++++----- doc/structured-streaming.md | 15 ++++++++++++++- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/doc/dataframe.md b/doc/dataframe.md index d4b51d6c..3d62b78f 100644 --- a/doc/dataframe.md +++ b/doc/dataframe.md @@ -341,11 +341,11 @@ root | max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 | | scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 | | iterator.grouping.size | the number of items to be grouped when iterating over underlying RDD partition | `Int` | 1000 | -| host | overrides `spark.redis.host` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | `localhost` | -| port | overrides `spark.redis.port` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `6379` | -| auth | overrides `spark.redis.auth` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | - | -| dbNum | overrides `spark.redis.db` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `0` | -| timeout | overrides `spark.redis.timeout` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `2000` | +| host | overrides `spark.redis.host` configured in SparkSession | `String` | `localhost` | +| port | overrides `spark.redis.port` configured in SparkSession | `Int` | `6379` | +| auth | overrides `spark.redis.auth` configured in SparkSession | `String` | - | +| dbNum | overrides `spark.redis.db` configured in SparkSession | `Int` | `0` | +| timeout | overrides `spark.redis.timeout` configured in SparkSession | `Int` | `2000` | ## Known limitations diff --git a/doc/structured-streaming.md b/doc/structured-streaming.md index 12ca3651..f68510e9 100644 --- a/doc/structured-streaming.md +++ b/doc/structured-streaming.md @@ -84,7 +84,7 @@ there will be the output `keys output:*`: 4) "30.5" ``` -Please refer to [DataFrame docs](dataframe.md) for different options(such as specifying key name) available for writing . +Please refer to [DataFrame docs](dataframe.md) for different options (such as specifying key name) available for writing. ### Stream Offset @@ -142,6 +142,19 @@ Please note, item ordering will be preserved only within a particular Redis key With the second approach you can read data from a single Redis key with multiple consumers in parallel, e.g. `option("stream.parallelism", 4)`. Each consumer will be mapped to a Spark partition. There are no ordering guarantees in this case. +### Connection options + +Similarly to Dataframe API, we can override connection options on the individual stream level, using following options passed to `spark.readStream`: + +| Name | Description | Type | Default | +| -----------| -------------------------------------------------------------| ---------- | ----------- | +| host | overrides `spark.redis.host` configured in SparkSession | `String` | `localhost` | +| port | overrides `spark.redis.port` configured in SparkSession | `Int` | `6379` | +| auth | overrides `spark.redis.auth` configured in SparkSession | `String` | - | +| dbNum | overrides `spark.redis.db` configured in SparkSession | `Int` | `0` | +| timeout | overrides `spark.redis.timeout` configured in SparkSession | `Int` | `2000` | + + ### Other configuration Spark-Redis automatically creates a consumer group with name `spark-source` if it doesn't exist. You can customize the consumer group name with