Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Feature/create kafka spout #3198

Merged
merged 10 commits into from
Apr 1, 2019

Conversation

simingweng
Copy link
Contributor

This is a KafkaSpout implemented using Heron Spout API.

It supports both AT_MOST_ONCE and AT_LEAST_ONCE message delivery guarantee, and there's a sample project showcasing how it is used in the simplest topology on a Simulator. The sample topology requires a local Kafka broker to run and test.

The project is currently a Maven project rooted at incubator-heron/contrib/spouts/kafka/java/heron-kafka-spout-parent, and depends on publicly available maven dependency heron-api. When it is converted to a Bazel target, it should cross reference the in-tree Heron API target.

Copy link
Contributor

@nwangtw nwangtw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scanned through briefly. Overall LGTM to me.

Add @nlu90 as reviewer who has more experience on Kafka Spout.

@nwangtw nwangtw requested a review from nlu90 February 26, 2019 16:13
@spennymac
Copy link

@simingweng Im having trouble following what you are doing when there is a failed tuple. Not due to your code being confusing, i am just missing something and would like an explanation. Are you replaying all tuples from the most recent failed offset?

From what I have read in the code, i have come to believe this is your process, is this correct?

  1. Add offset to failure registry.
  2. Check if there is a failure.
  3. Seek to failure offset.
  4. Remove previous acks
  5. Continue emitting..

The code looks good. I plan on using this shortly and will hopefully have more feedback after i get into it.

Thanks!

@simingweng
Copy link
Contributor Author

The Travis CI failed due to the following test failure:

It seems to be due to the actual JSON output just had different order in the JSON array compared to the expected output, which, I'm pretty sure, has nothing to do the maven test I added for the Kafka Spout. Can someone familiar with the integration test code base help with this IntegrationTest_SlidingTimeWindow1 test failure?

[2019-02-27 18:05:29 +0000] �[31m[ERROR]�[0m: Actual result did not match expected result�[0m [2019-02-27 18:05:29 +0000] �[32m[INFO]�[0m: Actual result ---------- ['{"1":[{"tuplesInWindow":["1"]},{"newTuples":["1"]},{"expiredTuples":[]}]}', '{"2":[{"tuplesInWindow":["2","3"]},{"newTuples":["2","3"]},{"expiredTuples":["1"]}]}', '{"3":[{"tuplesInWindow":["4"]},{"newTuples":["4"]},{"expiredTuples":["2","3"]}]}', '{"4":[{"tuplesInWindow":["5"]},{"newTuples":["5"]},{"expiredTuples":["4"]}]}', '{"5":[{"tuplesInWindow":["6"]},{"newTuples":["6"]},{"expiredTuples":["5"]}]}', '{"6":[{"tuplesInWindow":["7"]},{"newTuples":["7"]},{"expiredTuples":["6"]}]}', '{"7":[{"tuplesInWindow":["8"]},{"newTuples":["8"]},{"expiredTuples":["7"]}]}', '{"8":[{"tuplesInWindow":["9"]},{"newTuples":["9"]},{"expiredTuples":["8"]}]}', '{"9":[{"tuplesInWindow":["10"]},{"newTuples":["10"]},{"expiredTuples":["9"]}]}']�[0m [2019-02-27 18:05:29 +0000] �[32m[INFO]�[0m: Expected result ---------- ['{"1":[{"tuplesInWindow":["1"]},{"newTuples":["1"]},{"expiredTuples":[]}]}', '{"10":[{"tuplesInWindow":["10"]},{"newTuples":["10"]},{"expiredTuples":["9"]}]}', '{"2":[{"tuplesInWindow":["2"]},{"newTuples":["2"]},{"expiredTuples":["1"]}]}', '{"3":[{"tuplesInWindow":["3"]},{"newTuples":["3"]},{"expiredTuples":["2"]}]}', '{"4":[{"tuplesInWindow":["4"]},{"newTuples":["4"]},{"expiredTuples":["3"]}]}', '{"5":[{"tuplesInWindow":["5"]},{"newTuples":["5"]},{"expiredTuples":["4"]}]}', '{"6":[{"tuplesInWindow":["6"]},{"newTuples":["6"]},{"expiredTuples":["5"]}]}', '{"7":[{"tuplesInWindow":["7"]},{"newTuples":["7"]},{"expiredTuples":["6"]}]}', '{"8":[{"tuplesInWindow":["8"]},{"newTuples":["8"]},{"expiredTuples":["7"]}]}', '{"9":[{"tuplesInWindow":["9"]},{"newTuples":["9"]},{"expiredTuples":["8"]}]}']�[0m [2019-02-27 18:05:29 +0000] �[31m[ERROR]�[0m: Checking result failed for 20190227175427_IntegrationTest_SlidingTimeWindow1_e8519bcd-2a19-4971-b5ef-6bcb1dbf0b6e topology :: Traceback (most recent call last): File "integration_test/src/python/test_runner/main.py", line 204, in run_test return results_checker.check_results() File "integration_test/src/python/test_runner/main.py", line 123, in check_results return self._compare(expected_results, actual_results) File "integration_test/src/python/test_runner/main.py", line 136, in _compare raise failure TestFailure: ('Actual result did not match expected result', None) �[0m

@simingweng
Copy link
Contributor Author

simingweng commented Mar 2, 2019

@worlvlhole
Yes, I think you've got the most part. The KafkaSpout may be operated in 2 different reliability mode, ATMOST_ONCE or ATLEAST_ONCE, (I haven't added EFFECTIVE_ONCE implementation yet, I'm working on it).

ATMOST_ONCE mode

the whole topology will not turn the acking mechanism on. so, the KafkaSpout can afford to emit the tuple without any message id, and it also immediately commit the currently-read offset back to Kafka broker, and neither ack() nor fail() callback will be invoked. Therefore, "in-flight" tuple will just get lost in case the KafkaSpout instance is blown up or the topology is restarted. That's what ATMOST_ONCE offers.

ATLEAST_ONCE mode

the acking mechanism is turned on topology-wise, so the KafkaSpout uses the ack registry to keep tracking all the continuous acknowledgement ranges for each partition, while the failure registry keeps tracking the lowest failed acknowledgement for each partition. When it comes to the time that the Kafka Consumer needs to poll the Kafka cluster for more records (because it's emitted everything it got from the previous poll), then the KafkaSpout reconciles as following for each partition that it is consuming:

  1. if there's any failed tuple, seek back to the lowest corresponding offset
  2. discard all the acknowledgements that it's received but is greater than the lowest failed offset
  3. clear the lowest failed offset in failure registry
  4. commit the offset to be the upper boundary of the first range in the ack registry

So, it guarantees each tuple emitted by the KafkaSpout must be successfully processed across the whole topology at least once.

Not Implemented

What is missing in this Kafka Spout implementation now is to handle the EFFECTIVE_ONCE scenario, which should completely rely on the checkpointing mechanism to decide how far it needs to rewind back. I'm working on it right now.

I know this is quite some information, I'm writing README to explain things in more details, will keep updating the pull request.

@spennymac
Copy link

@simingweng Thank you so much for the in depth explanation!

Copy link
Contributor

@nwangtw nwangtw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. A good start for iteration. Thanks!

@nwangtw
Copy link
Contributor

nwangtw commented Mar 12, 2019

@worlvlhole
Yes, I think you've got the most part. The KafkaSpout may be operated in 2 different reliability mode, ATMOST_ONCE or ATLEAST_ONCE, (I haven't added EFFECTIVE_ONCE implementation yet, I'm working on it).

ATMOST_ONCE mode

the whole topology will not turn the acking mechanism on. so, the KafkaSpout can afford to emit the tuple without any message id, and it also immediately commit the currently-read offset back to Kafka broker, and neither ack() nor fail() callback will be invoked. Therefore, "in-flight" tuple will just get lost in case the KafkaSpout instance is blown up or the topology is restarted. That's what ATMOST_ONCE offers.

ATLEAST_ONCE mode

the acking mechanism is turned on topology-wise, so the KafkaSpout uses the ack registry to keep tracking all the continuous acknowledgement ranges for each partition, while the failure registry keeps tracking the lowest failed acknowledgement for each partition. When it comes to the time that the Kafka Consumer needs to poll the Kafka cluster for more records (because it's emitted everything it got from the previous poll), then the KafkaSpout reconciles as following for each partition that it is consuming:

  1. if there's any failed tuple, seek back to the lowest corresponding offset
  2. discard all the acknowledgements that it's received but is greater than the lowest failed offset
  3. clear the lowest failed offset in failure registry
  4. commit the offset to be the upper boundary of the first range in the ack registry

So, it guarantees each tuple emitted by the KafkaSpout must be successfully processed across the whole topology at least once.

Not Implemented

What is missing in this Kafka Spout implementation now is to handle the EFFECTIVE_ONCE scenario, which should completely rely on the checkpointing mechanism to decide how far it needs to rewind back. I'm working on it right now.

I know this is quite some information, I'm writing README to explain things in more details, will keep updating the pull request.

This information might be included in the document?

@rohanag12
Copy link
Contributor

Hi, thanks for this PR, happy to finally see a Kafka Spout implementation in Heron. I am planning on using this once it is merged, but I have a question. One major difference between this implementation and the Storm one is that Storm's spout allows emitting to different streams, using the org.apache.storm.kafka.spout.RecordTranslator interface. This implementation is missing this particular functionality, which is quite useful.

Is there a reason for not keeping this functionality in this Kafka Spout implementation? Or is there another way to achieve similar functionality (other than creating a map-function like bolt for this purpose)? It's really useful for sending data from different topics to different downstream bolts.

@spennymac
Copy link

spennymac commented Mar 18, 2019 via email

@nwangtw
Copy link
Contributor

nwangtw commented Mar 19, 2019

Slightly unrelated but, I think the need to set a list of spout requirements is still a big discussing that's needed.

Yeah. The requirements will evolve. Feel free to add things into the README file.

@nwangtw
Copy link
Contributor

nwangtw commented Mar 19, 2019

Hi, thanks for this PR, happy to finally see a Kafka Spout implementation in Heron. I am planning on using this once it is merged, but I have a question. One major difference between this implementation and the Storm one is that Storm's spout allows emitting to different streams, using the org.apache.storm.kafka.spout.RecordTranslator interface. This implementation is missing this particular functionality, which is quite useful.

Is there a reason for not keeping this functionality in this Kafka Spout implementation? Or is there another way to achieve similar functionality (other than creating a map-function like bolt for this purpose)? It's really useful for sending data from different topics to different downstream bolts.

Interesting. Is topic -> stream 1 to 1? Why not have multiple spout components and each one is responsible for one topic?

@simingweng
Copy link
Contributor Author

simingweng commented Mar 19, 2019

Hi, thanks for this PR, happy to finally see a Kafka Spout implementation in Heron. I am planning on using this once it is merged, but I have a question. One major difference between this implementation and the Storm one is that Storm's spout allows emitting to different streams, using the org.apache.storm.kafka.spout.RecordTranslator interface. This implementation is missing this particular functionality, which is quite useful.

Is there a reason for not keeping this functionality in this Kafka Spout implementation? Or is there another way to achieve similar functionality (other than creating a map-function like bolt for this purpose)? It's really useful for sending data from different topics to different downstream bolts.

very good question. I actually started with a "one-record-to-many-tuple" implementation, then I gave it a deep thought when I was implementing the ATLEAST_ONCE delivery guarantee. Allowing "one-record-to-many-tuple" will significantly complicate the algorithm to track acknowledgement, because then we have to keep tracking the mapping relationship between a single Kafka record offset to multiple message IDs.

And then we also face a design choice whether the KafkaSpout itself should decide the uniqueness of a set of Message IDs coming from the same ConsumerRecord, or we should open the choice up to the developer?

So, a neater choice is to use multiple KafkaSpout, each dedicated to an output stream.

But, I do agree "one-record-to-many-tuple" is pretty useful and cost effective in terms of resource consumption. I have no obligation to put it back in, but then it becomes the developer's responsibility to make sure avoid emitting multiple tuples out of one ConsumerRecord ONLY in ATLEAST_ONCE mode, at least for this version of KafkaSpout before we introduce a more complicated ack/fail tracking mechanism.

@rohanag12
Copy link
Contributor

Interesting. Is topic -> stream 1 to 1? Why not have multiple spout components and each one is responsible for one topic?

Topic -> Stream is 1 -> 1 in our use case, but in Storm's implementation, it can be configured to be many -> 1 as well.
Running multiple spouts adds the overhead of running more Heron instances and Kafka consumers. We operate in a resource constrained environment, so that is not always feasible.

very good question. I actually started with a "one-record-to-many-tuple" implementation, then I gave it a deep thought when I was implementing the ATLEAST_ONCE delivery guarantee. Allowing "one-record-to-many-tuple" will significantly complicate the algorithm to track acknowledgement, because then we have to keep tracking the mapping relationship between a single Kafka record offset to multiple message IDs.

And then we also face a design choice whether the KafkaSpout itself should decide the uniqueness of a set of Message IDs coming from the same ConsumerRecord, or we should open the choice up to the developer?

So, a neater choice is to use multiple KafkaSpout, each dedicated to an output stream.

But, I do agree "one-record-to-many-tuple" is pretty useful and cost effective in terms of resource consumption. I have no obligation to put it back in, but then it becomes the developer's responsibility to make sure avoid emitting multiple tuples out of one ConsumerRecord ONLY in ATLEAST_ONCE mode, at least for this version of KafkaSpout before we introduce a more complicated ack/fail tracking mechanism.

Agreed that one record -> many tuples is really useful, but it does make tracking offsets a lot harder. However, I don't think that is possible using the current Kafka spout implementation from Storm - it only allows 1 record -> 1 tuple emits, but to any declared stream.
Having this ability to choose the stream given a record would be a good start for the Heron Kafka spout, and it's much easier to implement.

@nwangtw
Copy link
Contributor

nwangtw commented Mar 19, 2019

Interesting. Is topic -> stream 1 to 1? Why not have multiple spout components and each one is responsible for one topic?

Topic -> Stream is 1 -> 1 in our use case, but in Storm's implementation, it can be configured to be many -> 1 as well.
Running multiple spouts adds the overhead of running more Heron instances and Kafka consumers. We operate in a resource constrained environment, so that is not always feasible.

Got it. Thanks. many -> 1 does sound useful in resource constrained env. It should be doable but there might be extra logic and config. I am thinking that maybe it can be created as a separated spout and share code with this 1:1 version.

very good question. I actually started with a "one-record-to-many-tuple" implementation, then I gave it a deep thought when I was implementing the ATLEAST_ONCE delivery guarantee. Allowing "one-record-to-many-tuple" will significantly complicate the algorithm to track acknowledgement, because then we have to keep tracking the mapping relationship between a single Kafka record offset to multiple message IDs.
And then we also face a design choice whether the KafkaSpout itself should decide the uniqueness of a set of Message IDs coming from the same ConsumerRecord, or we should open the choice up to the developer?
So, a neater choice is to use multiple KafkaSpout, each dedicated to an output stream.
But, I do agree "one-record-to-many-tuple" is pretty useful and cost effective in terms of resource consumption. I have no obligation to put it back in, but then it becomes the developer's responsibility to make sure avoid emitting multiple tuples out of one ConsumerRecord ONLY in ATLEAST_ONCE mode, at least for this version of KafkaSpout before we introduce a more complicated ack/fail tracking mechanism.

Agreed that one record -> many tuples is really useful, but it does make tracking offsets a lot harder. However, I don't think that is possible using the current Kafka spout implementation from Storm - it only allows 1 record -> 1 tuple emits, but to any declared stream.
Having this ability to choose the stream given a record would be a good start for the Heron Kafka spout, and it's much easier to implement.

Are these streams the same or they are different (like apply different filters/transforms)? If they are the same, what is the difference between it and spout has one output and multiple bolts register to that stream? Different offsets?

@simingweng
Copy link
Contributor Author

I think @rohanag12’s use case is that he has multiple topics and the record from each topic will only be translated into one tuple, and emit to one stream. Basically, it is “topic1” -> “stream 1”, “topic2” -> “stream 2”, etc. So, he would like to use one KafkaSpout, subscribing to multiple topics, and have the ConsumerRecordTransformer declare multiple output streams, and route the translated tuple to its destined stream base on the topic of the received record.

This pattern can save user one extra “distributor” bolt connected to the KafkaSpout. Basically, the routing logic becomes embedded in the KafkaSpout itself. But again, the trade-off of this flexibility is that developer needs to be careful not to emit multiple tuple out of one single record in ATLEAST_ONCE mode.

So, as long as one record is only translated into one emitted tuple, then the current KafkaSpout implementation shall work in ATLEAST_ONCE mode.

allow defining multiple output streams in ConsumerRecordTransformer
@simingweng
Copy link
Contributor Author

@nwangtw @rohanag12 I've just pushed one more commit to address the multiple output streams support, and also, now the spout is able to support EFFECTIVE_ONCE mode.

@nwangtw
Copy link
Contributor

nwangtw commented Mar 20, 2019

I think @rohanag12’s use case is that he has multiple topics and the record from each topic will only be translated into one tuple, and emit to one stream. Basically, it is “topic1” -> “stream 1”, “topic2” -> “stream 2”, etc. So, he would like to use one KafkaSpout, subscribing to multiple topics, and have the ConsumerRecordTransformer declare multiple output streams, and route the translated tuple to its destined stream base on the topic of the received record.

This pattern can save user one extra “distributor” bolt connected to the KafkaSpout. Basically, the routing logic becomes embedded in the KafkaSpout itself. But again, the trade-off of this flexibility is that developer needs to be careful not to emit multiple tuple out of one single record in ATLEAST_ONCE mode.

So, as long as one record is only translated into one emitted tuple, then the current KafkaSpout implementation shall work in ATLEAST_ONCE mode.

I see. So like N -> 1 and N -> N cases, not the 1 -> N case. Thanks.

@nwangtw nwangtw merged commit f5cbfd5 into apache:master Apr 1, 2019
sreev pushed a commit to sreev/incubator-heron that referenced this pull request Apr 9, 2020
* initial commit

* remove dependency on heron-storm compatibility library

* insert Copyright header

* use constant in ConsumerConfig for client configuration
add sample topology

* change to default Kafka Broker port number

* add Javadoc documentation

* add documentation 1

* include unit tests for heron kafka spout in Travis

* add documentation 2

* support EFFECTIVE_ONCE mode
allow defining multiple output streams in ConsumerRecordTransformer
nicknezis pushed a commit that referenced this pull request Sep 14, 2020
* initial commit

* remove dependency on heron-storm compatibility library

* insert Copyright header

* use constant in ConsumerConfig for client configuration
add sample topology

* change to default Kafka Broker port number

* add Javadoc documentation

* add documentation 1

* include unit tests for heron kafka spout in Travis

* add documentation 2

* support EFFECTIVE_ONCE mode
allow defining multiple output streams in ConsumerRecordTransformer
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants