Skip to content

Commit

Permalink
[SPARK-19853][SS] uppercase kafka topics fail when startingOffsets ar…
Browse files Browse the repository at this point in the history
…e SpecificOffsets

When using the KafkaSource with Structured Streaming, consumer assignments are not what the user expects if startingOffsets is set to an explicit set of topics/partitions in JSON where the topic(s) happen to have uppercase characters. When StartingOffsets is constructed, the original string value from options is transformed toLowerCase to make matching on "earliest" and "latest" case insensitive. However, the toLowerCase JSON is passed to SpecificOffsets for the terminal condition, so topic names may not be what the user intended by the time assignments are made with the underlying KafkaConsumer.

KafkaSourceProvider.scala:
```
val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
    case Some("latest") => LatestOffsets
    case Some("earliest") => EarliestOffsets
    case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
    case None => LatestOffsets
  }
```

Thank cbowden for reporting.

Jenkins

Author: uncleGen <hustyugm@gmail.com>

Closes #17209 from uncleGen/SPARK-19853.
  • Loading branch information
uncleGen authored and zsxwing committed Mar 13, 2017
1 parent 9f8ce48 commit 0a4d06a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
.map { k => k.drop(6).toString -> parameters(k) }
.toMap

val startingStreamOffsets =
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
case Some("latest") => LatestOffsetRangeLimit
case Some("earliest") => EarliestOffsetRangeLimit
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => LatestOffsetRangeLimit
}
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
Expand Down Expand Up @@ -128,19 +123,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
.map { k => k.drop(6).toString -> parameters(k) }
.toMap

val startingRelationOffsets =
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
case Some("earliest") => EarliestOffsetRangeLimit
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => EarliestOffsetRangeLimit
}
val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit)
assert(startingRelationOffsets != LatestOffsetRangeLimit)

val endingRelationOffsets =
caseInsensitiveParams.get(ENDING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
case Some("latest") => LatestOffsetRangeLimit
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => LatestOffsetRangeLimit
}
val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
assert(endingRelationOffsets != EarliestOffsetRangeLimit)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
Expand Down Expand Up @@ -388,34 +377,34 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister

private def validateBatchOptions(caseInsensitiveParams: Map[String, String]) = {
// Batch specific options
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
case Some("earliest") => // good to go
case Some("latest") =>
KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match {
case EarliestOffsetRangeLimit => // good to go
case LatestOffsetRangeLimit =>
throw new IllegalArgumentException("starting offset can't be latest " +
"for batch queries on Kafka")
case Some(json) => (SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)))
.partitionOffsets.foreach {
case SpecificOffsetRangeLimit(partitionOffsets) =>
partitionOffsets.foreach {
case (tp, off) if off == KafkaOffsetRangeLimit.LATEST =>
throw new IllegalArgumentException(s"startingOffsets for $tp can't " +
"be latest for batch queries on Kafka")
case _ => // ignore
}
case _ => // default to earliest
}

caseInsensitiveParams.get(ENDING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
case Some("earliest") =>
KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) match {
case EarliestOffsetRangeLimit =>
throw new IllegalArgumentException("ending offset can't be earliest " +
"for batch queries on Kafka")
case Some("latest") => // good to go
case Some(json) => (SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)))
.partitionOffsets.foreach {
case LatestOffsetRangeLimit => // good to go
case SpecificOffsetRangeLimit(partitionOffsets) =>
partitionOffsets.foreach {
case (tp, off) if off == KafkaOffsetRangeLimit.EARLIEST =>
throw new IllegalArgumentException(s"ending offset for $tp can't be " +
"earliest for batch queries on Kafka")
case _ => // ignore
}
case _ => // default to latest
}

validateGeneralOptions(caseInsensitiveParams)
Expand All @@ -432,7 +421,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister

def set(key: String, value: Object): this.type = {
map.put(key, value)
logInfo(s"$module: Set $key to $value, earlier value: ${kafkaParams.get(key).getOrElse("")}")
logInfo(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
this
}

Expand All @@ -450,10 +439,22 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister

private[kafka010] object KafkaSourceProvider {
private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", "assign")
private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
val TOPIC_OPTION_KEY = "topic"

private val deserClassName = classOf[ByteArrayDeserializer].getName

def getKafkaOffsetRangeLimit(
params: Map[String, String],
offsetOptionKey: String,
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit = {
params.get(offsetOptionKey).map(_.trim) match {
case Some(offset) if offset.toLowerCase == "latest" => LatestOffsetRangeLimit
case Some(offset) if offset.toLowerCase == "earliest" => EarliestOffsetRangeLimit
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => defaultOffsets
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -606,6 +607,24 @@ class KafkaSourceSuite extends KafkaSourceTest {
assert(query.exception.isEmpty)
}

test("get offsets from case insensitive parameters") {
for ((optionKey, optionValue, answer) <- Seq(
(STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit),
(ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit),
(STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""",
SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) {
val offset = getKafkaOffsetRangeLimit(Map(optionKey -> optionValue), optionKey, answer)
assert(offset === answer)
}

for ((optionKey, answer) <- Seq(
(STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit),
(ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) {
val offset = getKafkaOffsetRangeLimit(Map.empty, optionKey, answer)
assert(offset === answer)
}
}

private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"

private def assignString(topic: String, partitions: Iterable[Int]): String = {
Expand Down

0 comments on commit 0a4d06a

Please sign in to comment.