-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-2236: Kafka Spout with manual partition management. #1835
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, barring a few nits and the caught Exception in KafkaSpout. Good work :)
import java.util.HashSet; | ||
|
||
/** | ||
* Created by liurenjie on 19/12/2016. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -120,6 +124,8 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect | |||
commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); | |||
} | |||
|
|||
// Manual partition assignment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem like it belongs here anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -75,6 +78,7 @@ | |||
private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure | |||
private transient Timer commitTimer; // timer == null for auto commit mode | |||
private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. | |||
private transient KafkaRecordsFetcher<K, V> recordsFetcher; // Class that encapsulate the log of partition management |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, that's a typo. I've fixed in the new patch.
try { | ||
setWaitingToEmit(pollKafkaBroker()); | ||
} catch (Exception e) { | ||
LOG.error("Failed to poll from kafka.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you share what exception you're seeing here? We haven't seen the spout be unstable due to exceptions from here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have encountered many exceptions: KafkaFetchException, LeaderNoFoundException, etc. These are caused by temporarily kafka partition migration. I don't think we should restart the process in these cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, that makes sense. We're not getting them because I think automatic assignment avoids them. I think it's fine to catch them, but you should replace the Exception with the specific exception classes you're seeing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As kafka client's doc says, I've changed the exception to KafkaException.
doRefreshMyPartitions(); | ||
} | ||
|
||
private void refreshMyPartitionsIfNeed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Should be IfNeeded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} | ||
|
||
public static PartitionAssignmentChangeListener listenerOf(final ConsumerRebalanceListener consumerRebalanceListener) { | ||
return new PartitionAssignmentChangeListener() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't have to change it, but on master we're targeting Java 8, so lambdas are an option for this kind of thing :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, I also love lambda.
|
||
import java.util.List; | ||
|
||
public interface KafkaPartitionReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Some of these classes could probably go in the internal package, or a new subpackage of the internal package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to another package.
@hmcl You might want to take a look at this too |
setWaitingToEmit(pollKafkaBroker()); | ||
try { | ||
setWaitingToEmit(pollKafkaBroker()); | ||
} catch (KafkaException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should catch https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/RetriableException.html instead, it seems to cover all the exceptions that are expected to be transient (leader change for example, through InvalidMetadataException). Catching KafkaException will break interrupt handling (see #1821) because InterruptException is an instance of KafkaException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I've fixed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srdo good point, but the main point is also that the code that is handling the interrupt exceptions is very fragile. Right now you were able to catch this, but it's very difficult for other devs to code accounting for that.
@liurenjie1024 we are basically catching and logging the exception. Is that really the intent? Is there a scenario that we should not catch the exception, in which we would rather let the process die, and then restart ? The way the code currently works, if no new records are polled (i.e. an exception occurred) waitingToEmit
will always be false.
@liurenjie1024 once the review is complete and it incorporates the feedback from all the reviewers (I am going through the review now), please squash the commits into one commit, and make sure that this commit has the title of the JIRA it is addressing, as well as a summary of its contents. |
import java.util.List; | ||
import java.util.regex.Pattern; | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove author names and add a apache license on top of the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
KafkaConsumer<K, V> consumer, | ||
TopologyContext context, | ||
ConsumerRebalanceListener rebalanceListener) { | ||
if (kafkaSpoutConfig.getManualPartitionAssignment()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check these flags. i.e you are already implementing an interface KafkaRecordsFetcher and any code related to Manual or Automatic should go into respective classes. User can configure one or the other . Instead of checking if its manual or not here we should be doing with in the record fetchers .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think just a flag is easier to use since the user has to write code to configure it otherwise, especially in the case of using flux to configure storm topology. Though your way is more flexible but I don't think we need the flexibility here since it's an internal implementation and the user do not need to know much about that. Let the user assign the implementation class should be used in cases like tuple builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liurenjie1024 internal details or not we will keep adding one implementation or another so it's better to make this cleaner when we have an opportunity to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liurenjie1024 irrespective of if it's an internal detail or not we should go with the right approach. We have interfaces for a reason. If we keep adding implementations on how the records are being fetched we need to keep adding these if conditions. Given we are already making changes lets go with the right approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a factory method. Perhaps you could call this method create(...) or newInstance(...), or something along those lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The if conditions are wrapped in a factory method so adding more cases will not pollute other components.
- The construction of instances can only happen in the open method since it needs topology context.
- Interface is used to hide the client from implementation details and I don't think our approach here violated the rule.
+1 once the merge conflict is resolved. It would probably be good to add some unit tests of the new classes, but I think this can be merged without them. |
@srdo Do I have the access to resolve the conflicts? Or it should be resolved by those who merge the code? |
@liurenjie1024 You have access. You need to open your branch in the Git client you usually use and pull the latest commits from master on this repo (https://github.com/apache/storm.git). If you like you can use Here's an example of how it can be done from command line (I have a remote called "upstream" that points to this repository, if you don't have one like it use
|
@liurenjie1024 @srdo lets wait till all the pending comments are resolved. |
import org.apache.storm.kafka.spout.KafkaSpout; | ||
import org.apache.storm.kafka.spout.TopicPartitionComparator; | ||
|
||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove wildcard imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
return partitionRefreshPeriodMs; | ||
} | ||
|
||
public boolean getManualPartitionAssignment() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isManualPartitionAssignment()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -61,6 +64,7 @@ | |||
|
|||
// Storm | |||
protected SpoutOutputCollector collector; | |||
private TopologyContext topologyContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's used to create records fetcher.
@@ -591,7 +589,7 @@ public String toString() { | |||
|
|||
// =========== Timer =========== | |||
|
|||
private class Timer { | |||
public static class Timer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should make Timer it's own class, rather than a static inner class, since it's used in other objects as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
||
private void doRefreshMyPartitions() { | ||
List<TopicPartition> topicPartitions = partitionReader.readPartitions(consumer); | ||
Collections.sort(topicPartitions, KAFKA_TOPIC_PARTITION_COMPARATOR); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this sort necessary ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sort ensures that all instances of KafkaSpout see the same ordered list so that the calculation is correct.
import java.util.HashSet; | ||
|
||
public final class KafkaPartitionReaders { | ||
public static KafkaPartitionReader partitionReaderOf(KafkaSpoutStreams kafkaSpoutStreams) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these names are very misleading. Can you please call this create or newInstance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
107a33a
to
69bfe43
Compare
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* Created by liurenjie on 22/12/2016. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should really just edit the template in your IDE :)
I'm making the same change here https://github.com/apache/storm/pull/1832/files#diff-73306e32282def8d045e0bcd2d33f59b btw.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for that. I've fixed it. I'll merge your change later after you submitted it.
Hi, all: |
Please copy the Apache license into all the new files. You can just grab it from the top of KafkaSpout. |
kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); | ||
|
||
ManualKafkaRecordsFetcher.PartitionAssignmentChangeListener partitionAssignmentChangeListener = null; | ||
if (rebalanceListener != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this cause an NPE in ManualKafkaRecordsFetcher if the null case is hit here? Why does it make sense to call create without a rebalance listener?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't figure out a case for a null rebalance listener now, so this maybe an overdesign here.
LGTM. +1 |
@liurenjie1024 thanks for the updates. Overall looks good to me. Can you squash your commits into one commit. +1 after that. |
} | ||
} | ||
|
||
public static TopicPartition convert(PartitionInfo partitionInfo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call this toTopicPartition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
+1. Just one minor nit on a name. can you please squash the commits and put a little summary of the patch in the commit message. Thanks. |
320f2e9
to
00f5408
Compare
Hi, all: |
Any update about the merge process of this patch? |
@liurenjie1024 thanks for you patience. Merged into master. I would like to merge this into 1.x-branch as well. You are using FunctionalInterface and 1.x branch we are still on java 7 . Do you want to change that open another PR for 1.x-branch? |
@harshach OK I'll submit a PR for that later. |
As the title says.