Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19853][SS] uppercase kafka topics fail when startingOffsets are SpecificOffsets #17209

Closed
wants to merge 6 commits into from

Conversation

uncleGen
Copy link
Contributor

@uncleGen uncleGen commented Mar 8, 2017

What changes were proposed in this pull request?

When using the KafkaSource with Structured Streaming, consumer assignments are not what the user expects if startingOffsets is set to an explicit set of topics/partitions in JSON where the topic(s) happen to have uppercase characters. When StartingOffsets is constructed, the original string value from options is transformed toLowerCase to make matching on "earliest" and "latest" case insensitive. However, the toLowerCase JSON is passed to SpecificOffsets for the terminal condition, so topic names may not be what the user intended by the time assignments are made with the underlying KafkaConsumer.

KafkaSourceProvider.scala:

val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
    case Some("latest") => LatestOffsets
    case Some("earliest") => EarliestOffsets
    case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
    case None => LatestOffsets
  }

Thank @cbowden for reporting.

How was this patch tested?

Jenkins

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74205 has finished for PR 17209 at commit e2a26bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link

@cbcwebdev cbcwebdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, perhaps as a slightly additional enhancement the underlying tests could be modified to include some uppercase characters in their topic name(s) for at least one case

@uncleGen
Copy link
Contributor Author

uncleGen commented Mar 9, 2017

cc @zsxwing

@SparkQA
Copy link

SparkQA commented Mar 9, 2017

Test build #74237 has finished for PR 17209 at commit 9402f9d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case Some("latest") => LatestOffsetRangeLimit
case Some("earliest") => EarliestOffsetRangeLimit
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim) match {
case a @ Some(offset) if offset.toLowerCase.equals("latest") => LatestOffsetRangeLimit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case a @ Some(offset) if offset.toLowerCase.equals("latest") -> case Some(offset) if offset.toLowerCase == "latest".

Please also fix other places.

Copy link
Member

@zsxwing zsxwing Mar 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you just write a utility method to get offsets from caseInsensitiveParams? Then you can just test this method.

@SparkQA
Copy link

SparkQA commented Mar 10, 2017

Test build #74291 has finished for PR 17209 at commit a37d11d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Mar 10, 2017

Could you add a utility method like this?

  def getKafkaOffsetRangeLimit(
      params: Map[String, String],
      key: String,
      default: KafkaOffsetRangeLimit) : KafkaOffsetRangeLimit = {
    params.get(key).map(_.trim) match {
      case Some(offset) if offset.toLowerCase == "latest" => LatestOffsetRangeLimit
      case Some(offset) if offset.toLowerCase == "earliest" => EarliestOffsetRangeLimit
      case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
      case None => default
    }
  }

Then you can reuse the method and only test it, e.g.,

    val startingOffsets = getKafkaOffsetRangeLimit(
      caseInsensitiveParams,
      STARTING_OFFSETS_OPTION_KEY,
      EarliestOffsetRangeLimit
    )

    // Batch specific options
    startingOffsets match {
      case EarliestOffsetRangeLimit => // good to go
      case LatestOffsetRangeLimit =>
        throw new IllegalArgumentException("starting offset can't be latest " +
          "for batch queries on Kafka")
      case SpecificOffsetRangeLimit(partitionOffsets) =>
        partitionOffsets.foreach {
          case (tp, off) if off == KafkaOffsetRangeLimit.LATEST =>
            throw new IllegalArgumentException(s"startingOffsets for $tp can't " +
              "be latest for batch queries on Kafka")
          case _ => // ignore
        }
    }

@uncleGen
Copy link
Contributor Author

@zsxwing done,but forgot to push,I will update it as soon as possible when I connect to internet.

@SparkQA
Copy link

SparkQA commented Mar 11, 2017

Test build #74380 has finished for PR 17209 at commit 50ef0e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change for unit test

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good. Just some minor issues.

case None => EarliestOffsetRangeLimit
val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match {
case earliest @ EarliestOffsetRangeLimit => earliest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startingRelationOffsets won't be latest since it's checked in validateBatchOptions.
Why not just:

val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
  caseInsensitiveParams,
  STARTING_OFFSETS_OPTION_KEY,
  EarliestOffsetRangeLimit)
assert(startingRelationOffsets != LatestOffsetRangeLimit)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 much more simple

case None => LatestOffsetRangeLimit
val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) match {
case latest @ LatestOffsetRangeLimit => latest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

throw new IllegalArgumentException("starting offset can't be latest " +
"for batch queries on Kafka")
case Some(json) => (SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)))
.partitionOffsets.foreach {
case specific: SpecificOffsetRangeLimit =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: case SpecificOffsetRangeLimit(partitionOffsets) =>

case Some(json) => (SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)))
.partitionOffsets.foreach {
case LatestOffsetRangeLimit => // good to go
case specific: SpecificOffsetRangeLimit =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: case SpecificOffsetRangeLimit(partitionOffsets) =>

@@ -606,6 +607,36 @@ class KafkaSourceSuite extends KafkaSourceTest {
assert(query.exception.isEmpty)
}

for((optionKey, optionValue, answer) <- Seq(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move the for loop into the test. Not need to create many tests here.

}
}

for((optionKey, answer) <- Seq(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

answer
)

assert(offset == answer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit == => ===

@SparkQA
Copy link

SparkQA commented Mar 12, 2017

Test build #74399 has finished for PR 17209 at commit be9f87e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Mar 13, 2017
…e SpecificOffsets

When using the KafkaSource with Structured Streaming, consumer assignments are not what the user expects if startingOffsets is set to an explicit set of topics/partitions in JSON where the topic(s) happen to have uppercase characters. When StartingOffsets is constructed, the original string value from options is transformed toLowerCase to make matching on "earliest" and "latest" case insensitive. However, the toLowerCase JSON is passed to SpecificOffsets for the terminal condition, so topic names may not be what the user intended by the time assignments are made with the underlying KafkaConsumer.

KafkaSourceProvider.scala:
```
val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
    case Some("latest") => LatestOffsets
    case Some("earliest") => EarliestOffsets
    case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
    case None => LatestOffsets
  }
```

Thank cbowden for reporting.

Jenkins

Author: uncleGen <hustyugm@gmail.com>

Closes #17209 from uncleGen/SPARK-19853.

(cherry picked from commit 0a4d06a)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
@@ -606,6 +607,24 @@ class KafkaSourceSuite extends KafkaSourceTest {
assert(query.exception.isEmpty)
}

test("test to get offsets from case insensitive parameters") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove test to. I will fix it when merging your PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@zsxwing
Copy link
Member

zsxwing commented Mar 13, 2017

Thanks. Merging to master and 2.1.

@asfgit asfgit closed this in 0a4d06a Mar 13, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants