Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19185][DSTREAM] Make Kafka consumer cache configurable #18234

Closed
wants to merge 2 commits into from

Conversation

markgrover
Copy link
Member

What changes were proposed in this pull request?

Add a new property spark.streaming.kafka.consumer.cache.enabled that allows users to enable or disable the cache for Kafka consumers. This property can be especially handy in cases where issues like SPARK-19185 get hit, for which there isn't a solution committed yet. By default, the cache is still on, so this change doesn't change any out-of-box behavior.

How was this patch tested?

Running unit tests

@markgrover
Copy link
Member Author

This is related to but is a stripped down version of #16629.

@SparkQA
Copy link

SparkQA commented Jun 7, 2017

Test build #77799 has finished for PR 18234 at commit 68ca3f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -91,7 +91,7 @@ The new Kafka consumer API will pre-fetch messages into buffers. Therefore it i

In most cases, you should use `LocationStrategies.PreferConsistent` as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use `PreferBrokers`, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use `PreferFixed`. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location).

The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`
The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`. If you would like to disable the caching for Kafka consumers, you can set `spark.streaming.kafka.consumer.cache.enabled` to `false`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be more conservative to not even publicly document this option, if it's intended as a fairly temporary safety-valve. But I'm not against it. At least it should be clear it's not necessarily going to be guaranteed to be there going forward.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am open to either option. I'd slightly prefer documenting that option and saying something about no guarantee.

Thanks for reviewing, Sean. @koeninger would appreciate your review as well, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code change LGTM.

I'd prefer clarifying / adding caveats to the documentation, rather than leaving it undocumented.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks I have updated the doc to add a little more context. If you could review and help this get committed, I'd really appreciate that. Thanks again!

@SparkQA
Copy link

SparkQA commented Jun 8, 2017

Test build #77802 has finished for PR 18234 at commit 2f60741.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jun 8, 2017

If there are no more comments I'll push this in the morning.

I'd have filed a separate bug since now SPARK-19185 will forever be "in progress" (unless an admin sees this and changes its status), but too late now.

@koeninger
Copy link
Contributor

LGTM, thanks Mark

@markgrover
Copy link
Member Author

markgrover commented Jun 8, 2017 via email

@srowen
Copy link
Member

srowen commented Jun 8, 2017

@vanzin I think it's appropriate to attach this to the existing issue, because it's inherently connected to any other changes that follow. We can definitely un-mark it as In Progress.

@vanzin
Copy link
Contributor

vanzin commented Jun 8, 2017

Merging to master / 2.2.

asfgit pushed a commit that referenced this pull request Jun 8, 2017
## What changes were proposed in this pull request?

Add a new property `spark.streaming.kafka.consumer.cache.enabled` that allows users to enable or disable the cache for Kafka consumers. This property can be especially handy in cases where issues like SPARK-19185 get hit, for which there isn't a solution committed yet. By default, the cache is still on, so this change doesn't change any out-of-box behavior.

## How was this patch tested?
Running unit tests

Author: Mark Grover <mark@apache.org>
Author: Mark Grover <grover.markgrover@gmail.com>

Closes #18234 from markgrover/spark-19185.

(cherry picked from commit 55b8cfe)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
@asfgit asfgit closed this in 55b8cfe Jun 8, 2017
@markgrover
Copy link
Member Author

Thanks @vanzin

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants