Skip to content

[SPARK-21873][SS] - Avoid using return inside CachedKafkaConsumer.get#19059

Closed
YuvalItzchakov wants to merge 2 commits intoapache:masterfrom
YuvalItzchakov:master
Closed

[SPARK-21873][SS] - Avoid using return inside CachedKafkaConsumer.get#19059
YuvalItzchakov wants to merge 2 commits intoapache:masterfrom
YuvalItzchakov:master

Conversation

@YuvalItzchakov
Copy link

During profiling of a structured streaming application with Kafka as the source, I came across this exception:

Structured Streaming Kafka Exceptions

This is a 1 minute sample, which caused 106K NonLocalReturnControl exceptions to be thrown.
This happens because CachedKafkaConsumer.get is ran inside:

private def runUninterruptiblyIfPossible[T](body: => T): T

Where body: => T is the get method. Turning the method into a function means that in order to escape the while loop defined in get the runtime has to do dirty tricks which involve throwing the above exception.

What changes were proposed in this pull request?

Instead of using return (which is generally not recommended in Scala), we place the result of the fetchData method inside a local variable and use a boolean flag to indicate the status of fetching data, which we monitor as our predicate to the while loop.

How was this patch tested?

I've ran the KafkaSourceSuite to make sure regression passes. Since the exception isn't visible from user code, there is no way (at least that I could think of) to add this as a test to the existing suite.

… to `org.apache.spark.util.UninterruptibleThread.runUninterruptibly` as a function type which causes a NonLocalReturnControl to be called for every call
resetFetchedData()
null

if (isFetchComplete) consumerRecord else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go ahead and put the if clause on a new line with braces

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
try {
return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can fetchData return null? if not, then the condition can just be on consumerRecord == null

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Aug 26, 2017

Test build #3907 has finished for PR 19059 at commit c20bd14.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 29, 2017

Test build #3910 has finished for PR 19059 at commit 18b9301.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Aug 29, 2017

By the way @YuvalItzchakov could you make a JIRA and link it? this is small but it's a non-trivial improvement and think we should handle it as a normal issue.

@YuvalItzchakov YuvalItzchakov changed the title [SS] - Avoid using return inside CachedKafkaConsumer.get [SPARK-21873][SS] - Avoid using return inside CachedKafkaConsumer.get Aug 30, 2017
@YuvalItzchakov
Copy link
Author

@srowen
Copy link
Member

srowen commented Aug 30, 2017

Merged to master

@asfgit asfgit closed this in 8f0df6b Aug 30, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants