Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4964] [Streaming] Exactly-once semantics for Kafka #3798

Closed
wants to merge 45 commits into from

Conversation

koeninger
Copy link
Contributor

No description provided.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@jerryshao
Copy link
Contributor

Hi @koeninger , several simple questions:

  1. How to map each RDD partition to Kafka partition, each Kafka partition is a RDD partition?
  2. How to do receiver injection rate control, in other words, how to decide at which offset current task should read?
  3. Do you have any consideration of fault tolerance?

In general it is quite similar to what I did long ago a Kafka InputFormat (https://github.com/jerryshao/kafka-input-format) which can be loaded by HadoopRDD. I'm not sure is this the streaming way of fixing the exact-once semantics?

@koeninger
Copy link
Contributor Author

Hi @jerryshao

I'd politely ask that anyone with questions read at least KafkaRDD.scala and the example usage linked from the jira ticket (it's only about 50 significant lines of code):
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalExample.scala

I'll try to address your points.

  1. Yes, each RDD partition maps directly to a Kafka (topic, partition, inclusive starting offset, exclusive ending offset)
  2. It's a pull model, not a receiver push model. All the InputDStream implementation is doing is checking the leaders' highest offsets and defining an RDD based on that. When the RDD is run, its iterator makes a connection to kafka and pulls the data. This is done because it's simpler, and because using existing network receiver code would require dedicating 1 core per kafka partition, which is unacceptable from an ops standpoint.
  3. Yes. The fault tolerance model is that it should be safe for any or all of the spark machines to be completely destroyed at any point in the job, and the job should be able to be safely restarted. I don't think you can do better than this. This is achieved because all important state, especially the storage of offsets, are controlled by client code, not spark. In both the transactional and idempotent client code approaches, offsets aren't stored until data is stored, so restart should be safe.

Regarding your approach that you link, the problem there is (a) it's not a part of the spark distribution so people won't know about it, and (b) it assumes control of kafka offsets and storage in zookeeper, which makes it impossible for client code to control exactly once semantics.

Regarding the possible semantic disconnect between spark streaming and treating kafka as a durable store of data from the past (assuming that's what you meant)... I agree there is a disconnect there. But it's a fundamental problem with spark streaming in that it implicitly depends on "now" rather than a time embedded in the data stream. I don't think we're fixing that with this ticket.

@@ -44,7 +44,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.8.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this necessary? What aspect of this PR depends on this updated version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That being said, @helena may soon update this version anyway in #3631 IIUC.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huitseeker @koeninger @tdas I do have the initial Kafka 0.8.2 PR in, just waiting to update the version to GA vs beta and re-test and check for any changes/regression.

@koeninger
Copy link
Contributor Author

I got some good feedback from Koert Kuipers at Tresata regarding location awareness, so I'll be doing some refactoring to add that.

edit - doing the refactor that Koert and I were discussing is going to be really awkward without some way of knowing what the attempt id is, or at least knowing whether compute() is being called on a retry. Asked a question on the dev list about it.

I still think it's important to get location awareness in there one way or the other.

edit - waiting on the outcome of SPARK-4014, hopefully that will solve the attemptId issue

}
iter = resp.messageSet(part.topic, part.partition)
.iterator
.dropWhile(_.offset < requestOffset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a drop here? Doesnt the response return messages for the requested offset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

"Also note that we are explicitly checking that the offset being read is
not less than the offset that we requested. This is needed since if Kafka
is compressing the messages, the fetch request will return an entire
compressed block even if the requested offset isn't the beginning of the
compressed block. Thus a message we saw previously may be returned again."

On Tue, Dec 30, 2014 at 2:00 PM, Tathagata Das notifications@github.com
wrote:

In external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala
#3798 (diff):

  •          build()
    
  •        val resp = consumer.fetch(req)
    
  •        if (resp.hasError) {
    
  •          val err = resp.errorCode(part.topic, part.partition)
    
  •          if (err == ErrorMapping.LeaderNotAvailableCode ||
    
  •            err == ErrorMapping.NotLeaderForPartitionCode) {
    
  •            log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
    
  •              s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
    
  •            Thread.sleep(kc.config.refreshLeaderBackoffMs)
    
  •          }
    
  •          // Let normal rdd retry sort out reconnect attempts
    
  •          throw ErrorMapping.exceptionFor(err)
    
  •        }
    
  •        iter = resp.messageSet(part.topic, part.partition)
    
  •          .iterator
    
  •          .dropWhile(_.offset < requestOffset)
    

Why is there a drop here? Doesnt the response return messages for the
requested offset?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3798/files#r22362167.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow! That was not intuitive. Worth mentioning this in the code.

@SparkQA
Copy link

SparkQA commented Feb 4, 2015

Test build #26706 has started for PR 3798 at commit 59e29f6.

  • This patch merges cleanly.

@tdas
Copy link
Contributor

tdas commented Feb 4, 2015

Hey Cody, I was trying it and I found a odd behavior. It was printing this repeatedly.

15/02/03 18:22:08 WARN VerifiableProperties: Property metadata.broker.list is not valid

I was using this code.

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerList)
    val lines = KafkaUtils.createNewStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

I chose "metadata.broker.list" from the code in KafkaCluster, because without that I was getting exception from the KafkaCluster.

@koeninger
Copy link
Contributor Author

Yeah, there's a weird distinction in Kafka between simple consumers and
high level consumers in that they have a lot of common configuration
parameters, but one of them talks directly to brokers and the other goes
through zk.

I'll see if I can make a private subclass of ConsumerConfig to shut that
warning up.

On Tue, Feb 3, 2015 at 8:28 PM, Tathagata Das notifications@github.com
wrote:

Hey Cody, I was trying it and I found a odd behavior. It was printing this
repeatedly.

15/02/03 18:22:08 WARN VerifiableProperties: Property metadata.broker.list is not valid

I was using this code.

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerList)
val lines = KafkaUtils.createNewStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

I chose "metadata.broker.list" from the code in KafkaCluster, because
without that I was getting exception from the KafkaCluster.


Reply to this email directly or view it on GitHub
#3798 (comment).

@tdas
Copy link
Contributor

tdas commented Feb 4, 2015

Why did you choose the parameters "metadata.broker.list" and the "bootstrap.servers" as the required kafka params? I looked at the Kafka docs, and it says that for consumers, the necessary properties are
"zookeeper.connect" and "group.id". And intuitively the application is consuming, so the consumer configs should apply (not "group.id", but "zookeeper.connect"). So our interface should also require "zookeeper.connect" and not other two. Isnt it?

@koeninger
Copy link
Contributor Author

High level consumers connect to ZK.

Simple consumers (which is what this is using) connect to brokers directly
instead. See
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

I chose to accept either of the two existing means in Kafka of specifying a
list of seed brokers, rather than making up yet a third way

On Tue, Feb 3, 2015 at 8:36 PM, Tathagata Das notifications@github.com
wrote:

Why did you choose the parameters "metadata.broker.list" and the
"bootstrap.servers" as the required kafka params? I looked at the Kafka
docs, and it says that for consumers, the necessary properties are
"zookeeper.connect" and "group.id". And intuitively the application is
consuming, so the consumer configs should apply (not "group.id", but
"zookeeper.connect"). So our interface should also require
"zookeeper.connect" and not other two. Isnt it?


Reply to this email directly or view it on GitHub
#3798 (comment).

@SparkQA
Copy link

SparkQA commented Feb 4, 2015

Test build #26701 has finished for PR 3798 at commit 8c31855.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DeterministicKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this)
    • class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable
    • case class LeaderOffset(host: String, port: Int, offset: Long)
    • class KafkaRDDPartition(
    • trait HasOffsetRanges

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26701/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Feb 4, 2015

I see. ConsumerConfig is really necessary only for high-level consumer, but you are using it configure stuff in the low level consumer as well. That is so that you dont have to introduce parameter strings to configure them yourselves.

Is it possible to assign fake but verifiable zookeeper.connect ?

@koeninger
Copy link
Contributor Author

Yeah, more importantly it's so defaults for things like connection timeouts
match what kafka provides.

It's possible to assign fake zookeeper.connect and have it pass
verification, that's what existing code does.

Unfortunately ConsumerConfig has a private constructor so subclassing it in
order for the broker list to pass verification without that warning may
prove to be tricky. Worst case scenario I'll re-implement a config that
uses the kafka defaults.

On Tue, Feb 3, 2015 at 9:05 PM, Tathagata Das notifications@github.com
wrote:

I see. ConsumerConfig is really necessary only for high-level consumer,
but you are using it configure stuff in the low level consumer as well.
That is so that you dont have to introduce parameter strings to configure
them yourselves.

Is it possible to assign fake but verified zookeeper.connect ?


Reply to this email directly or view it on GitHub
#3798 (comment).

@SparkQA
Copy link

SparkQA commented Feb 4, 2015

Test build #26706 has finished for PR 3798 at commit 59e29f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this)
    • class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable
    • case class LeaderOffset(host: String, port: Int, offset: Long)
    • class KafkaRDDPartition(
    • trait HasOffsetRanges

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26706/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Feb 4, 2015

I think the simplest solution is to assign zookeeper.connect. But you are assigning it in KafkaCluster lines 338 - 345. So why is this warning being thrown?

@jerryshao
Copy link
Contributor

Hi @tdas , should we add a example to show users how to use this new Kafka API correctly?

@koeninger
Copy link
Contributor Author

The warning is for metadata.broker.list, since its not expected by the
existing ConsumerConfig (its used by other config classes)

Couldn't get subclassing to work, the verifiedproperties class it uses is
very dependent on order of operations during construction.

I think the simplest thing is a class that is constructed using
kafkaparams, and uses the static defaults from the ConsumerConfig object.

I'm currently waiting in an ER with my child with a 105 fever, so won't be
getting to it for a few hours to say the least.
On Feb 3, 2015 10:15 PM, "Tathagata Das" notifications@github.com wrote:

I think the simplest solution is to assign zookeeper.connect. But you are
assigning it in KafkaCluster lines 338 - 345. So why is this warning being
thrown?


Reply to this email directly or view it on GitHub
#3798 (comment).

@tdas
Copy link
Contributor

tdas commented Feb 4, 2015

Holy crap! Dont bother about this at all. This can wait. I hope everything
is okay. Take care and all the best!
On Feb 3, 2015 8:45 PM, "Cody Koeninger" notifications@github.com wrote:

The warning is for metadata.broker.list, since its not expected by the
existing ConsumerConfig (its used by other config classes)

Couldn't get subclassing to work, the verifiedproperties class it uses is
very dependent on order of operations during construction.

I think the simplest thing is a class that is constructed using
kafkaparams, and uses the static defaults from the ConsumerConfig object.

I'm currently waiting in an ER with my child with a 105 fever, so won't be
getting to it for a few hours to say the least.
On Feb 3, 2015 10:15 PM, "Tathagata Das" notifications@github.com wrote:

I think the simplest solution is to assign zookeeper.connect. But you are
assigning it in KafkaCluster lines 338 - 345. So why is this warning
being
thrown?


Reply to this email directly or view it on GitHub
#3798 (comment).


Reply to this email directly or view it on GitHub
#3798 (comment).

@SparkQA
Copy link

SparkQA commented Feb 4, 2015

Test build #26763 has started for PR 3798 at commit 1dc2941.

  • This patch merges cleanly.

@koeninger
Copy link
Contributor Author

Here's a solution for subclassing ConsumerConfig while still silencing the warning.
My son is doing ok(ish) now, thanks for the concern.

@tdas
Copy link
Contributor

tdas commented Feb 4, 2015

Glad to hear that your son is doing ok, hope he gets better (okish--> great) real real soon. :)

@tdas
Copy link
Contributor

tdas commented Feb 4, 2015

That's a nifty solution :) I like it.
Lets merge this as soon as the tests pass. Smaller changes like docs, etc. , we can do it in the next PR.
@jerryshao I will add the example in the next PR.

@SparkQA
Copy link

SparkQA commented Feb 4, 2015

Test build #26763 has finished for PR 3798 at commit 1dc2941.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this)
    • class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable
    • case class LeaderOffset(host: String, port: Int, offset: Long)
    • class KafkaRDDPartition(
    • trait HasOffsetRanges

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26763/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Feb 4, 2015

Merging this. Thanks so much Cody!
There will be a PR to fix a few things here and there soon.

@harishreedharan
Copy link
Contributor

Yay!

@asfgit asfgit closed this in b0c0021 Feb 4, 2015
asfgit pushed a commit that referenced this pull request Feb 4, 2015
Author: cody koeninger <cody@koeninger.org>

Closes #3798 from koeninger/kafkaRdd and squashes the following commits:

1dc2941 [cody koeninger] [SPARK-4964] silence ConsumerConfig warnings about broker connection props
59e29f6 [cody koeninger] [SPARK-4964] settle on "Direct" as a naming convention for the new stream
8c31855 [cody koeninger] [SPARK-4964] remove HasOffsetRanges interface from return types
0df3ebe [cody koeninger] [SPARK-4964] add comments per pwendell / dibbhatt
8991017 [cody koeninger] [SPARK-4964] formatting
825110f [cody koeninger] [SPARK-4964] rename stuff per TD
4354bce [cody koeninger] [SPARK-4964] per td, remove java interfaces, replace with final classes, corresponding changes to KafkaRDD constructor and checkpointing
9adaa0a [cody koeninger] [SPARK-4964] formatting
0090553 [cody koeninger] [SPARK-4964] javafication of interfaces
9a838c2 [cody koeninger] [SPARK-4964] code cleanup, add more tests
2b340d8 [cody koeninger] [SPARK-4964] refactor per TD feedback
80fd6ae [cody koeninger] [SPARK-4964] Rename createExactlyOnceStream so it isnt over-promising, change doc
99d2eba [cody koeninger] [SPARK-4964] Reduce level of nesting.  If beginning is past end, its actually an error (may happen if Kafka topic was deleted and recreated)
19406cc [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
2e67117 [cody koeninger] [SPARK-4964] one potential way of hiding most of the implementation, while still allowing access to offsets (but not subclassing)
bb80bbe [cody koeninger] [SPARK-4964] scalastyle line length
d4a7cf7 [cody koeninger] [SPARK-4964] allow for use cases that need to override compute for custom kafka dstreams
c1bd6d9 [cody koeninger] [SPARK-4964] use newly available attemptNumber for correct retry behavior
548d529 [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
0458e4e [cody koeninger] [SPARK-4964] recovery of generated rdds from checkpoint
e86317b [cody koeninger] [SPARK-4964] try seed brokers in random order to spread metadata requests
e93eb72 [cody koeninger] [SPARK-4964] refactor to add preferredLocations.  depends on SPARK-4014
356c7cc [cody koeninger] [SPARK-4964] code cleanup per helena
adf99a6 [cody koeninger] [SPARK-4964] fix serialization issues for checkpointing
1d50749 [cody koeninger] [SPARK-4964] code cleanup per tdas
8bfd6c0 [cody koeninger] [SPARK-4964] configure rate limiting via spark.streaming.receiver.maxRate
e09045b [cody koeninger] [SPARK-4964] add foreachPartitionWithIndex, to avoid doing equivalent map + empty foreach boilerplate
cac63ee [cody koeninger] additional testing, fix fencepost error
37d3053 [cody koeninger] make KafkaRDDPartition available to users so offsets can be committed per partition
bcca8a4 [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
6bf14f2 [cody koeninger] first attempt at a Kafka dstream that allows for exactly-once semantics
326ff3c [cody koeninger] add some tests
38bb727 [cody koeninger] give easy access to the parameters of a KafkaRDD
979da25 [cody koeninger] dont allow empty leader offsets to be returned
8d7de4a [cody koeninger] make sure leader offsets can be found even for leaders that arent in the seed brokers
4b078bf [cody koeninger] differentiate between leader and consumer offsets in error message
3c2a96a [cody koeninger] fix scalastyle errors
29c6b43 [cody koeninger] cleanup logging
783b477 [cody koeninger] update tests for kafka 8.1.1
7d050bc [cody koeninger] methods to set consumer offsets and get topic metadata, switch back to inclusive start / exclusive end to match typical kafka consumer behavior
ce91c59 [cody koeninger] method to get consumer offsets, explicit error handling
4dafd1b [cody koeninger] method to get leader offsets, switch rdd bound to being exclusive start, inclusive end to match offsets typically returned from cluster
0b94b33 [cody koeninger] use dropWhile rather than filter to trim beginning of fetch response
1d70625 [cody koeninger] WIP on kafka cluster
76913e2 [cody koeninger] Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader

(cherry picked from commit b0c0021)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet