-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream #4384
Conversation
Test build #26822 has started for PR 4384 at commit
|
/** | ||
* Consumes messages from one or more topics in Kafka and does wordcount. | ||
* Usage: DirectKafkaWordCount <brokers> <topics> | ||
* <brokers> is a list of one or more zookeeper servers that make quorum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are kafka servers, not zookeeper servers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Thanks!
@koeninger Can you please take a look. |
import org.apache.spark.streaming.dstream.{DStream, InputDStream} | ||
import org.apache.spark.util.Utils | ||
|
||
class DirectKafkaStreamSuite extends KafkaStreamSuiteBase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just renamed the file and testsuite class from KafkaDirectStreamSuite to DirectKafkaStreamSuite, but Git/Github considered it to be a move. The first unit test is unmodified.
Test build #26822 has finished for PR 4384 at commit
|
Test FAILed. |
Test build #586 has started for PR 4384 at commit
|
Test build #586 has finished for PR 4384 at commit
|
Test build #26878 has started for PR 4384 at commit
|
Test build #26878 has finished for PR 4384 at commit
|
Test FAILed. |
Test build #26888 has started for PR 4384 at commit
|
Test build #26891 has started for PR 4384 at commit
|
Test build #26888 has finished for PR 4384 at commit
|
Test PASSed. |
Test build #26891 has finished for PR 4384 at commit
|
Test PASSed. |
Test build #589 has started for PR 4384 at commit
|
Test build #589 has finished for PR 4384 at commit
|
* @param batch Each KafkaRDDPartition in the batch corresponds to a | ||
* range of offsets for a given Kafka topic/partition | ||
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set | ||
* with Kafka broker(s) specified in host1:port1,host2:port2 form. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You removed the "not zookeeper servers" note here, but left it in for the other methods that take kafkaParams
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason we were writing "not zookeepers" is to make the difference with the earlier stream clear, for people who want to switch from the old one or the new. That applies to the public API. This is internal private API. I can add it back, no issues.
Thanks for adding the java friendly kafka uitls methods. Your original reason for wanting Array[Leader] rather than Map[TopicAndPartition, Broker] was for java compatibility. But since there are separate scala and java-specific KafkaUtils.createRDD methods now, couldn't the scala one take a Map and the java one take a JMap? As it stands currently, what is the meaning if someone passes in an array with multiple Leader objects that have the same topic and partition but different broker? The first thing we do with the array of leaders is convert it to a map...seems better to just take a map and avoid both the possibility of confusion and the extra conversion. |
* :: Experimental :: | ||
* Represents any object that has a collection of [[OffsetRange]]s. This can be used access the | ||
* offset ranges in RDDs generated by the direct Kafka DStream (see | ||
* [[KafkaUtils.createDirectStream()]]). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably good for the doc for createDirectStream to link to here, in addition to the other way around
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Let me add the references.
Test build #26966 has started for PR 4384 at commit
|
Test build #26966 has finished for PR 4384 at commit
|
Test PASSed. |
@pwendell Could you take a look at the scala docs? |
Test build #27078 has started for PR 4384 at commit
|
Test build #27078 has finished for PR 4384 at commit
|
Test PASSed. |
|
||
/** | ||
* :: Experimental :: | ||
* Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the other one pull messages from a Kafka Broker as well? It might be good to focus on something that is distinctive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about saying "without using receivers" at the end?
Test build #27178 has started for PR 4384 at commit
|
Changes - Added example - Added a critical unit test that verifies that offset ranges can be recovered through checkpoints Might add more changes. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4384 from tdas/new-kafka-fixes and squashes the following commits: 7c931c3 [Tathagata Das] Small update 3ed9284 [Tathagata Das] updated scala doc 83d0402 [Tathagata Das] Added JavaDirectKafkaWordCount example. 26df23c [Tathagata Das] Updates based on PR comments from Cody e4abf69 [Tathagata Das] Scala doc improvements and stuff. bb65232 [Tathagata Das] Fixed test bug and refactored KafkaStreamSuite 50f2b56 [Tathagata Das] Added Java API and added more Scala and Java unit tests. Also updated docs. e73589c [Tathagata Das] Minor changes. 4986784 [Tathagata Das] Added unit test to kafka offset recovery 6a91cab [Tathagata Das] Added example (cherry picked from commit c151346) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Test build #27178 has finished for PR 4384 at commit
|
Test PASSed. |
…Suite The test was incorrect. Instead of counting the number of records, it counted the number of partitions of RDD generated by DStream. Which is not its intention. I will be testing this patch multiple times to understand its flakiness. PS: This was caused by my refactoring in #4384 koeninger check it out. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4597 from tdas/kafka-flaky-test and squashes the following commits: d236235 [Tathagata Das] Unignored last test. e9a1820 [Tathagata Das] fix test (cherry picked from commit 3912d33) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
…Suite The test was incorrect. Instead of counting the number of records, it counted the number of partitions of RDD generated by DStream. Which is not its intention. I will be testing this patch multiple times to understand its flakiness. PS: This was caused by my refactoring in #4384 koeninger check it out. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4597 from tdas/kafka-flaky-test and squashes the following commits: d236235 [Tathagata Das] Unignored last test. e9a1820 [Tathagata Das] fix test
Changes
Might add more changes.