Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions doc/dataframe.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,11 @@ root
| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 |
| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 |
| iterator.grouping.size | the number of items to be grouped when iterating over underlying RDD partition | `Int` | 1000 |
| host | overrides `spark.redis.host` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | `localhost` |
| port | overrides `spark.redis.port` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `6379` |
| auth | overrides `spark.redis.auth` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | - |
| dbNum | overrides `spark.redis.db` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `0` |
| timeout | overrides `spark.redis.timeout` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `2000` |
| host | overrides `spark.redis.host` configured in SparkSession | `String` | `localhost` |
| port | overrides `spark.redis.port` configured in SparkSession | `Int` | `6379` |
| auth | overrides `spark.redis.auth` configured in SparkSession | `String` | - |
| dbNum | overrides `spark.redis.db` configured in SparkSession | `Int` | `0` |
| timeout | overrides `spark.redis.timeout` configured in SparkSession | `Int` | `2000` |


## Known limitations
Expand Down
15 changes: 14 additions & 1 deletion doc/structured-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ there will be the output `keys output:*`:
4) "30.5"
```

Please refer to [DataFrame docs](dataframe.md) for different options(such as specifying key name) available for writing .
Please refer to [DataFrame docs](dataframe.md) for different options (such as specifying key name) available for writing.

### Stream Offset

Expand Down Expand Up @@ -142,6 +142,19 @@ Please note, item ordering will be preserved only within a particular Redis key

With the second approach you can read data from a single Redis key with multiple consumers in parallel, e.g. `option("stream.parallelism", 4)`. Each consumer will be mapped to a Spark partition. There are no ordering guarantees in this case.

### Connection options

Similarly to Dataframe API, we can override connection options on the individual stream level, using following options passed to `spark.readStream`:

| Name | Description | Type | Default |
| -----------| -------------------------------------------------------------| ---------- | ----------- |
| host | overrides `spark.redis.host` configured in SparkSession | `String` | `localhost` |
| port | overrides `spark.redis.port` configured in SparkSession | `Int` | `6379` |
| auth | overrides `spark.redis.auth` configured in SparkSession | `String` | - |
| dbNum | overrides `spark.redis.db` configured in SparkSession | `Int` | `0` |
| timeout | overrides `spark.redis.timeout` configured in SparkSession | `Int` | `2000` |


### Other configuration

Spark-Redis automatically creates a consumer group with name `spark-source` if it doesn't exist. You can customize the consumer group name with
Expand Down
22 changes: 22 additions & 0 deletions src/main/scala/com/redislabs/provider/redis/RedisConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
)
}


/**
* Constructor from spark config and parameters.
*
* @param conf spark context config
* @param parameters source specific parameters
*/
def this(conf: SparkConf, parameters: Map[String, String]) {
this(
parameters.getOrElse("host", conf.get("spark.redis.host", Protocol.DEFAULT_HOST)),
parameters.getOrElse("port", conf.get("spark.redis.port", Protocol.DEFAULT_PORT.toString)).toInt,
parameters.getOrElse("auth", conf.get("spark.redis.auth", null)),
parameters.getOrElse("dbNum", conf.get("spark.redis.db", Protocol.DEFAULT_DATABASE.toString)).toInt,
parameters.getOrElse("timeout", conf.get("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT.toString)).toInt,
parameters.getOrElse("ssl", conf.get("spark.redis.ssl", false.toString)).toBoolean)
}


/**
* Constructor with Jedis URI
*
Expand Down Expand Up @@ -122,6 +140,10 @@ object RedisConfig {
def fromSparkConf(conf: SparkConf): RedisConfig = {
new RedisConfig(new RedisEndpoint(conf))
}

def fromSparkConfAndParameters(conf: SparkConf, parameters: Map[String, String]): RedisConfig = {
new RedisConfig(new RedisEndpoint(conf, parameters))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,8 @@ class RedisSourceRelation(override val sqlContext: SQLContext,
with Serializable
with Logging {

private implicit val redisConfig: RedisConfig = {
new RedisConfig(
if ((parameters.keySet & Set("host", "port", "auth", "dbNum", "timeout")).isEmpty) {
new RedisEndpoint(sqlContext.sparkContext.getConf)
} else {
val host = parameters.getOrElse("host", Protocol.DEFAULT_HOST)
val port = parameters.get("port").map(_.toInt).getOrElse(Protocol.DEFAULT_PORT)
val auth = parameters.getOrElse("auth", null)
val dbNum = parameters.get("dbNum").map(_.toInt).getOrElse(Protocol.DEFAULT_DATABASE)
val timeout = parameters.get("timeout").map(_.toInt).getOrElse(Protocol.DEFAULT_TIMEOUT)
val ssl = parameters.get("ssl").map(_.toBoolean).getOrElse(false)
RedisEndpoint(host, port, auth, dbNum, timeout, ssl)
}
)
}
private implicit val redisConfig: RedisConfig = RedisConfig.fromSparkConfAndParameters(
sqlContext.sparkContext.getConf, parameters)

implicit private val readWriteConfig: ReadWriteConfig = {
val global = ReadWriteConfig.fromSparkConf(sqlContext.sparkContext.getConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class RedisSource(sqlContext: SQLContext, metadataPath: String,

private val sc = sqlContext.sparkContext

implicit private val redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf)
implicit private val redisConfig: RedisConfig = RedisConfig.fromSparkConfAndParameters(sc.getConf, parameters)

private val sourceConfig = RedisSourceConfig.fromMap(parameters)

Expand Down