Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18176][Streaming][Kafka] Kafka010 .createRDD() scala API should expect scala Map #15681

Closed
wants to merge 5 commits into from

Conversation

lw-lin
Copy link
Contributor

@lw-lin lw-lin commented Oct 29, 2016

What changes were proposed in this pull request?

Thoughout external/kafka-010, Java APIs are expecting java.util.Maps and Scala APIs are expecting scala.collection.Maps, with the exception of KafkaUtils.createRDD() Scala API expecting a java.util.Map.

This patch has added another createRDD overload which takes a scala.collection.Map.

How was this patch tested?

Existing test cases & added new basic test case.

* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a>.
* Requires "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
Copy link
Contributor Author

@lw-lin lw-lin Oct 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have an executorKafkaParams param at all; same below

@@ -56,32 +57,18 @@ object KafkaUtils extends Logging {
@Experimental
def createRDD[K, V](
sc: SparkContext,
kafkaParams: ju.Map[String, Object],
kafkaParams: collection.Map[String, Object],
Copy link
Contributor Author

@lw-lin lw-lin Oct 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the change -- it's a public API change but I think it would be OK

fixKafkaParams(kp)
val osr = offsetRanges.clone()

new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true)
Copy link
Contributor Author

@lw-lin lw-lin Oct 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all has been moved into the private createRDDInternal() method untouched

@SparkQA
Copy link

SparkQA commented Oct 29, 2016

Test build #67761 has finished for PR 15681 at commit 7fb71ef.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 29, 2016

Test build #67763 has finished for PR 15681 at commit 408f8a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin
Copy link
Contributor Author

lw-lin commented Oct 31, 2016

@koeninger could you also take a look at this, thanks!

@koeninger
Copy link
Contributor

Public API changes even on an experimental module aren't a minor thing, open a Jira.

It's also better not to lump unrelated changes together.

@lw-lin lw-lin changed the title [Minor][Streaming][Kafka] Kafka010 .createRDD() scala API should expect scala Map [SPARK-18176][Streaming][Kafka] Kafka010 .createRDD() scala API should expect scala Map Oct 31, 2016
@lw-lin
Copy link
Contributor Author

lw-lin commented Oct 31, 2016

@koeninger understood, and thanks! JIRA opened and unrelated changes removed.

Let's see what Jenkins would say.

@SparkQA
Copy link

SparkQA commented Oct 31, 2016

Test build #67800 has finished for PR 15681 at commit 9d6bd36.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

koeninger commented Oct 31, 2016

I think this should just be another createRDD overload that takes a scala map. The minor additional maintenance overhead of that method as opposed to changing the existing one isn't worth breaking clients. I'm all in favor of changes to experimental apis when they are necessary, but this isn't necessary.

That also makes most of the other changes in this PR unnecessary - you don't need to touch mima, you don't need to refactor to createRddInternal, etc etc.

@lw-lin
Copy link
Contributor Author

lw-lin commented Nov 1, 2016

@koeninger that makes sense and thanks!

Reverted all changes and added another createRDD overload. :-)

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67879 has finished for PR 15681 at commit 65f814c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

LGTM, thanks.

If you want to open a separate PR to cleanup the private doc issues you noticed, go for it, shouldn't need another Jira imho if it isn't changing code

@lw-lin
Copy link
Contributor Author

lw-lin commented Nov 1, 2016

Thank you @koeninger !

Please let me cc @srowen who's been around to also take a look~

@srowen
Copy link
Member

srowen commented Nov 1, 2016

Should we then deprecate the existing method this is 'replacing', if it's not going to be removed? the message I get is that the existing one shouldn't really be used. Otherwise it's hard to understand why both exist when looking at the API.

@lw-lin
Copy link
Contributor Author

lw-lin commented Nov 1, 2016

Deprecating the existing one would mean we still need to introduce some createRDDInternal, and let the deprecated one call it; then we can just remove the deprecated one some time in the future?

I don't have strong preferences here :)

@srowen
Copy link
Member

srowen commented Nov 1, 2016

We would want to change the code so that nothing calls the deprecated method including tests. I think you can let the deprecated one call the new one? When to remove it is a question for the future, yes.

@koeninger
Copy link
Contributor

I don't think there's a reason to deprecate it. ju.Map is the lowest
common denominator for kafka params, it's used by the underlying consumer,
and it's what the ConsumerStrategy interface expects to return. I would
definitely prefer to use it in my Scala jobs that were generating a
sequence of RDDs.

On Tue, Nov 1, 2016 at 9:47 AM, Sean Owen notifications@github.com wrote:

We would want to change the code so that nothing calls the deprecated
method including tests. I think you can let the deprecated one call the new
one? When to remove it is a question for the future, yes.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#15681 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAGAB-urGivwIvNU8dmlHNqF11otDBo_ks5q51EdgaJpZM4KkHhg
.

@srowen
Copy link
Member

srowen commented Nov 1, 2016

I see, so it's not an 'oversight'. If you're saying you prefer a Java Map in this API, is there much value in an overload for a Scala Map? the conversion is just one method call.

@koeninger
Copy link
Contributor

I'm agnostic on the value of adding the overload, if @lw-lin thinks it's more convenient for users. There are considerably fewer overloads as it stands than the old 0.8 version of KafkaUtils, so I don't think we're getting too messy/crowded yet.

@srowen
Copy link
Member

srowen commented Nov 1, 2016

@lw-lin are you persuaded or still feel it's fairly valuable for users? I guess I'm neutral too.

@lw-lin
Copy link
Contributor Author

lw-lin commented Nov 2, 2016

I'm closing this for now; can re-open if other people should demand this. thanks!

@lw-lin lw-lin closed this Nov 2, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants