Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-2236 storm kafka client support manual partition management. #1825

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.spout.SpoutOutputCollector;
Expand Down Expand Up @@ -59,18 +60,24 @@ public class KafkaSpout<K, V> extends BaseRichSpout {

// Storm
protected SpoutOutputCollector collector;
protected int thisTaskIndex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could be private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

protected int taskCount;

// Kafka
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
private transient KafkaConsumer<K, V> kafkaConsumer;
private transient boolean consumerAutoCommitMode;



// Bookkeeping
private transient int maxRetries; // Max number of times a tuple is retried
private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation
private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure
private transient Timer commitTimer; // timer == null for auto commit mode
private transient Timer partitionRefreshTimer; // partitionRefreshTime != null if in manual partition assign model
private transient boolean manualPartitionAssignment;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Rename to manualPartitionAssignmentEnabled here and in KafkaSpoutConfig

private transient KafkaSpoutConsumerRebalanceListener partitionRebalanceListener;
private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
// Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()

Expand All @@ -94,12 +101,16 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect

// Spout internals
this.collector = collector;
thisTaskIndex = context.getThisTaskIndex();
taskCount = context.getComponentTasks(context.getThisComponentId()).size();
maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
numUncommittedOffsets = 0;

// Offset management
firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
manualPartitionAssignment = kafkaSpoutConfig.isManualPartitionAssign();
partitionRebalanceListener = new KafkaSpoutConsumerRebalanceListener();

// Retries management
retryService = kafkaSpoutConfig.getRetryService();
Expand All @@ -111,6 +122,10 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
}

if (manualPartitionAssignment) {
partitionRefreshTimer = new Timer(500, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
}

acked = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = Collections.emptyListIterator();
Expand Down Expand Up @@ -200,16 +215,22 @@ private void setAcked(TopicPartition tp, long fetchOffset) {
@Override
public void nextTuple() {
if (initialized) {
if (commit()) {
commitOffsetsForAckedTuples();
}
try {
refreshPartitionIfNeeded();

if (poll()) {
setWaitingToEmit(pollKafkaBroker());
}
if (commit()) {
commitOffsetsForAckedTuples();
}

if (waitingToEmit()) {
emit();
if (poll()) {
setWaitingToEmit(pollKafkaBroker());
}

if (waitingToEmit()) {
emit();
}
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you catching this? If the spout encounters some unexpected exception, we want it to crash I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not uncommon to encounter an exception when kafka consumer poll data from broker and I don't think we should restart it. But I should not catch the whole function. This has been resolved in the new patch.

LOG.error("Failed to emit tuples.", e);
}
} else {
LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
Expand All @@ -236,6 +257,35 @@ private boolean poll() {
return poll;
}

private void refreshPartitionIfNeeded() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Rename to something more precise, like refreshAssignedPartitionsIfNeeded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

if (!manualPartitionAssignment || !partitionRefreshTimer.isExpiredResetOnTrue()) return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add braces to this if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

doRefreshPartitions();
}

private void doRefreshPartitions() {
KafkaSpoutStreamsNamedTopics streams = KafkaSpoutStreamsNamedTopics.class.cast(kafkaSpoutStreams);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can still support wildcard topics via KafkaConsumer.listTopics. If it isn't added in this PR, we should make an issue for it at least.

Also nit: Why are you using class.cast instead of C-style cast?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Wildcard topics support are added.
  2. class.cast is preferred because it throws an exception which better explains the failure rather than NPE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing "(KafkaSpoutStreamsNamedTopics) streams" throws a ClassCastException if streams isn't assignable to the type being casted to as far as I know, not an NPE? Doesn't really matter, I was just curious :)

List<PartitionInfo> partitions = KafkaUtils.readPartitions(kafkaConsumer, streams.getTopics());
List<TopicPartition> tps = new ArrayList<>(partitions.size());
for (PartitionInfo info: partitions) {
tps.add(new TopicPartition(info.topic(), info.partition()));
}

Collections.sort(tps, TopicPartitionComparator.INSTANCE);

Set<TopicPartition> myPartitions = new HashSet<>(tps.size()/taskCount + 1);
for (int i = thisTaskIndex; i < tps.size(); i += taskCount) {
myPartitions.add(tps.get(i));
}

Set<TopicPartition> originalPartitions = kafkaConsumer.assignment();

if (!originalPartitions.equals(myPartitions)) {
partitionRebalanceListener.onPartitionsRevoked(originalPartitions);
partitionRebalanceListener.onPartitionsAssigned(myPartitions);
kafkaConsumer.assign(myPartitions);
}
}

private boolean waitingToEmit() {
return waitingToEmit != null && waitingToEmit.hasNext();
}
Expand All @@ -262,11 +312,12 @@ private void doSeekRetriableTopicPartitions() {
final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();

for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last committed offset
KafkaSpout.OffsetEntry entry = acked.get(rtp);
if (entry != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like it if you could revert the commit for fixing STORM-2077, since a different fix is being ported back from master.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

final OffsetAndMetadata offsetAndMeta = entry.findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
}
}
}
}
Expand Down Expand Up @@ -360,18 +411,22 @@ private void subscribeKafkaConsumer() {
kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());

if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
kafkaConsumer.subscribe(topics, new KafkaSpoutConsumerRebalanceListener());
LOG.info("Kafka consumer subscribed topics {}", topics);
} else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern();
kafkaConsumer.subscribe(pattern, new KafkaSpoutConsumerRebalanceListener());
LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
if (manualPartitionAssignment) {
doRefreshPartitions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, but it would probably be good to log which topic is subscribed as well here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

} else {
if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
kafkaConsumer.subscribe(topics, partitionRebalanceListener);
LOG.info("Kafka consumer subscribed topics {}", topics);
} else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern();
kafkaConsumer.subscribe(pattern, partitionRebalanceListener);
LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
}
// Initial poll to get the consumer registration process going.
// KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
kafkaConsumer.poll(0);
}
// Initial poll to get the consumer registration process going.
// KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
kafkaConsumer.poll(0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; // 30s
public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever
public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; // 10,000,000 records => 80MBs of memory footprint in the worst case
public static final int DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; // 2s

// Kafka property names
public interface Consumer {
Expand Down Expand Up @@ -73,9 +74,11 @@ public enum FirstPollOffsetStrategy {
private final long pollTimeoutMs;

// Kafka spout configuration
private final boolean manualPartitionAssign;
private final long offsetCommitPeriodMs;
private final int maxRetries;
private final int maxUncommittedOffsets;
private final int partitionRefreshPeriodMs;
private final FirstPollOffsetStrategy firstPollOffsetStrategy;
private final KafkaSpoutStreams kafkaSpoutStreams;
private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
Expand All @@ -93,6 +96,8 @@ private KafkaSpoutConfig(Builder<K,V> builder) {
this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
this.tuplesBuilder = builder.tuplesBuilder;
this.retryService = builder.retryService;
this.manualPartitionAssign = builder.manualPartitionAssign;
this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
}

private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
Expand All @@ -113,6 +118,8 @@ public static class Builder<K,V> {
private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
private final KafkaSpoutStreams kafkaSpoutStreams;
private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
private boolean manualPartitionAssign = false;
private int partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
private final KafkaSpoutRetryService retryService;

Expand Down Expand Up @@ -230,8 +237,37 @@ public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPol
}

public KafkaSpoutConfig<K,V> build() {
validate();
return new KafkaSpoutConfig<>(this);
}

/**
* Defines whether the consumer manages partition manually.
* If set to true, the consumer behaves like a simple consumer, otherwise it will rely on kafka to do partition assignment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider changing this to "If set to true, the spout instances are assigned partitions round-robin", since readers maybe don't know what it means to behave like a simple consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

* @param manualPartitionAssign Whether use manual partition assignment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "True if using manual partition assignment" is clearer IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

*/
public Builder setManualPartitionAssign(boolean manualPartitionAssign) {
this.manualPartitionAssign = manualPartitionAssign;
return this;
}

/**
* Defines partition refresh period in the manual partition assign model.
* @param partitionRefreshPeriodMs Partition refresh period in ms.
*/
public Builder setPartitionRefreshPeriodMs(int partitionRefreshPeriodMs) {
this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
return this;
}

/**
* Validate configs before build.
*/
private void validate() {
if (this.manualPartitionAssign && kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to check that kafkaSpoutStreams is an instance of KafkaSpoutStreamsNamedTopics, so this doesn't break if more subclasses are added at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

throw new IllegalArgumentException("Manual partition assign can't be used with wildcard topics!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assign -> assignment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

}
}
}

public Map<String, Object> getKafkaProps() {
Expand Down Expand Up @@ -283,6 +319,10 @@ public Pattern getTopicWildcardPattern() {
null;
}

public boolean isManualPartitionAssign() {
return manualPartitionAssign;
}

public int getMaxTupleRetries() {
return maxRetries;
}
Expand All @@ -307,6 +347,10 @@ public KafkaSpoutRetryService getRetryService() {
return retryService;
}

public int getPartitionRefreshPeriodMs() {
return partitionRefreshPeriodMs;
}

@Override
public String toString() {
return "KafkaSpoutConfig{" +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.storm.kafka.spout;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Created by liurenjie on 12/7/16.
*/
public final class KafkaUtils {
public static List<PartitionInfo> readPartitions(KafkaConsumer<?, ?> consumer, Iterable<String> topics) {
List<PartitionInfo> partitionInfos = new ArrayList<>();
for (String topic : topics) {
partitionInfos.addAll(consumer.partitionsFor(topic));
}

return partitionInfos;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.storm.kafka.spout;

import org.apache.kafka.common.TopicPartition;

import java.util.Comparator;

/**
* Created by liurenjie on 12/7/16.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the created by comment from the files

*/
public enum TopicPartitionComparator implements Comparator<TopicPartition> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't think there's anything wrong with making this an enum, but it seems conceptually weird to me. Why not just make this a regular class, and then make the field for it in the spout static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new PR.

INSTANCE;

@Override
public int compare(TopicPartition o1, TopicPartition o2) {
if (!o1.topic().equals(o2.topic())) {
return o1.topic().compareTo(o2.topic());
} else {
return o1.partition() - o2.partition();
}
}
}