Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-2236 storm kafka client support manual partition management. #1825

Closed
wants to merge 3 commits into from

Conversation

liurenjie1024
Copy link
Contributor

  1. Support manual partition assignment, since kafka managed partition assignment may cause unnecessary rebalance if you don't keep polling.
  2. A little change to fix STOMR-2077 which may lose failed messages.

@srdo
Copy link
Contributor

srdo commented Dec 14, 2016

Hi. STORM-2077 is being resolved here https://issues.apache.org/jira/browse/STORM-2087. Sorry for the inconvenience.

I'm not sure I understand why adding manual partition management is necessary. Kafka has recently added heartbeating from a background thread, so it's no longer necessary to poll often in order to avoid rebalances (see http://kafka.apache.org/documentation.html#upgrade_1010_notable, search for "heartbeat"). That change decouples necessary poll frequency from the time it would take to detect a crashed consumer, so the max.poll.interval.ms can be bumped if a rebalance every 5 minutes is unacceptable (and then only if poll isn't called in that interval).

Could you describe a use case?

@liurenjie1024
Copy link
Contributor Author

I think the behavior of consumer's automatic partition management is complicated and may lead to unpredictable behavior, which is quite important for many purposes like debugging, performance tuning, etc, especially in critical systems like billing system in our case. In fact, even the author of kafka consumer suggests us to use manual partition management when we use streaming process frameworks (please refer to the Manual Partition Assignment part of [kafka consumer doc (http://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html). Other streaming process frameworks like flink, apex, spark streaming, old storm kafka client also uses manual partition management.

@hmcl
Copy link
Contributor

hmcl commented Dec 15, 2016

@liurenjie1024 putting aside for a second the discussion about the need to implement partition assignment, if we indeed agree that we should support it, I think that the KafkaSpout class is getting a bit too bloated to handle some of these separate requirements. In a sense this is a new feature on its own. I would favor using a more OOD approach if possible. Perhaps subclass or compose the KafkaSpout @srdo @revans2 what's your opinion?

@liurenjie1024
Copy link
Contributor Author

Maybe we do not need to maintain two versions of KafkaSpout with same functionality. The main goal of automatic partition management is to ease fault tolerant in simple applications, not in frameworks like storm. Automatic partition management may hide extra overhead and unpredictable behavior which is unnecessary in frameworks like storm.

@srdo
Copy link
Contributor

srdo commented Dec 15, 2016

@liurenjie1024 Okay, it makes sense that having Kafka manage partitions isn't really necessary since Storm will make sure to keep spout instances running. I can't speak to how much of an overhead and unpredictability there really is to making Kafka manage it though, but it's fine by me if we want to switch to manual management. I can't think of a good reason for supporting both manual and automatic assignment though.

The reason the old Kafka spout did manual assignment was that automatic assignment wasn't available on the old APIs as far as I know.

I agree that we shouldn't have two variants of the spout, it just adds unnecessary complexity. I'd rather we just do a solid manual assignment implementation and get rid of the automatic code, if we need to switch.

Keep in mind that to be on par with automatic assignment manual assignment needs to support adding partitions without requiring a spout restart, and without the spout instances stepping on each others' toes (e.g. avoid partitions being assigned to two spout instances at once, even while reassigning).

@hmcl Sure, it's more readable if strategies like this can be sectioned off in their own classes. If we really have to support both, I think it's nicer if assignment can be implemented through delegates rather than through subclassing.

@liurenjie1024
Copy link
Contributor Author

@srdo I'll submit a patch for that later.

Copy link
Contributor

@srdo srdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most of this looks good.

@@ -59,18 +60,24 @@

// Storm
protected SpoutOutputCollector collector;
protected int thisTaskIndex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could be private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -236,6 +257,35 @@ private boolean poll() {
return poll;
}

private void refreshPartitionIfNeeded() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Rename to something more precise, like refreshAssignedPartitionsIfNeeded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

if (waitingToEmit()) {
emit();
}
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you catching this? If the spout encounters some unexpected exception, we want it to crash I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not uncommon to encounter an exception when kafka consumer poll data from broker and I don't think we should restart it. But I should not catch the whole function. This has been resolved in the new patch.

// Bookkeeping
private transient int maxRetries; // Max number of times a tuple is retried
private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation
private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure
private transient Timer commitTimer; // timer == null for auto commit mode
private transient Timer partitionRefreshTimer; // partitionRefreshTime != null if in manual partition assign model
private transient boolean manualPartitionAssignment;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Rename to manualPartitionAssignmentEnabled here and in KafkaSpoutConfig

@@ -236,6 +257,35 @@ private boolean poll() {
return poll;
}

private void refreshPartitionIfNeeded() {
if (!manualPartitionAssignment || !partitionRefreshTimer.isExpiredResetOnTrue()) return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add braces to this if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

/**
* Defines whether the consumer manages partition manually.
* If set to true, the consumer behaves like a simple consumer, otherwise it will rely on kafka to do partition assignment.
* @param manualPartitionAssign Whether use manual partition assignment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "True if using manual partition assignment" is clearer IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

* Validate configs before build.
*/
private void validate() {
if (this.manualPartitionAssign && kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to check that kafkaSpoutStreams is an instance of KafkaSpoutStreamsNamedTopics, so this doesn't break if more subclasses are added at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

*/
private void validate() {
if (this.manualPartitionAssign && kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
throw new IllegalArgumentException("Manual partition assign can't be used with wildcard topics!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assign -> assignment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

import java.util.Comparator;

/**
* Created by liurenjie on 12/7/16.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the created by comment from the files

/**
* Created by liurenjie on 12/7/16.
*/
public enum TopicPartitionComparator implements Comparator<TopicPartition> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't think there's anything wrong with making this an enum, but it seems conceptually weird to me. Why not just make this a regular class, and then make the field for it in the spout static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

@srdo
Copy link
Contributor

srdo commented Dec 16, 2016

Btw @liurenjie1024 you need to make a PR for this against master first. We can't as far as I know merge features to 1.x before they are also on master.

@srdo
Copy link
Contributor

srdo commented Dec 16, 2016

Also I was wrong about spout instances interfering during reassignment. Since the task count is constant for the lifetime of the topology, there's no issue since a partition will always be assigned to the same task. My bad :)

@liurenjie1024
Copy link
Contributor Author

Hi, @srdo I've submitted a new patch for this issue with some refactor against master branch. I think most of your comments are great and have fixed them in the new patch. Please help to review this PR and this PR will be closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants