diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md index c5ed4a5311e..a8418807fd7 100644 --- a/external/storm-kafka/README.md +++ b/external/storm-kafka/README.md @@ -83,7 +83,7 @@ The KafkaConfig class also has bunch of public variables that controls your appl public int fetchMaxWait = 10000; public int bufferSizeBytes = 1024 * 1024; public MultiScheme scheme = new RawMultiScheme(); - public boolean forceFromStart = false; + public boolean ignoreZkOffsets = false; public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; @@ -120,6 +120,25 @@ spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); ``` +### How KafkaSpout stores offsets of a kafka topic and recovers incase of failures + +As shown in the above KafkaConfig properties , user can control where in the topic they can start reading by setting **KafkaConfig.startOffsetTime.** + +These are the options +1. **kafka.api.OffsetRequest.EarliestTime() or -2 (value returned by EarliestTime())** which makes the KafkaSpout to read from the begining of the topic +2. **kafka.api.OffsetRequest.LatestTime() or -1 (value returned by LatestTime())** which starts at the end of the topic ,any new messsages that are being written to the topic +3. **System.time.currentTimeMillis()** + +When user first deploys a KakfaSpout based topology they can use one of the above options. As the topology runs +KafkaSpout keeps track of the offsets its reading and writes these offset information under **SpoutConfig.zkRoot+ "/" + SpoutConfig.id** +Incase of failures it recovers from the last written offset from zookeeper. + +If users deployed a topology , later killed and re-deploying should make sure that **SpoutConfig.id** and **SpoutConfig.zkRoot** +remains the same otherwise Kafkaspout won't be able to start from stored zookeeper offsets. + +Users can set **KafkaConfig.ignoreZkOffsets** to **true** to make KafkaSpout ignore any zookeeper based offsets +and start from configured **KafkaConfig.startOffsetTime**. + ## Using storm-kafka with different versions of Scala Storm-kafka's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java index 5c85983f379..dd71b5a2151 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java @@ -33,7 +33,7 @@ public class KafkaConfig implements Serializable { public int fetchMaxWait = 10000; public int bufferSizeBytes = 1024 * 1024; public MultiScheme scheme = new RawMultiScheme(); - public boolean forceFromStart = false; + public boolean ignoreZkOffsets = false; public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java index 137dc99a0d6..b01803227cc 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java @@ -60,10 +60,7 @@ public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) { public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) { - long startOffsetTime = kafka.api.OffsetRequest.LatestTime(); - if ( config.forceFromStart ) { - startOffsetTime = config.startOffsetTime; - } + long startOffsetTime = config.startOffsetTime; return getOffset(consumer, topic, partition, startOffsetTime); } diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index 630c7f6de64..00ab981e0b4 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -91,9 +91,9 @@ public PartitionManager(DynamicPartitionConnections connections, String topology if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON? _committedTo = currentOffset; LOG.info("No partition information found, using configuration to determine offset"); - } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) { + } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) { _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime); - LOG.info("Topology change detected and reset from start forced, using configuration to determine offset"); + LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset"); } else { _committedTo = jsonOffset; LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId ); diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java index 1a9be43d1e0..61c79a52fa5 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java @@ -100,7 +100,7 @@ private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition if (lastTopoMeta != null) { lastInstanceId = (String) lastTopoMeta.get("id"); } - if (_config.forceFromStart && !_topologyInstanceId.equals(lastInstanceId)) { + if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) { offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime); } else { offset = (Long) lastMeta.get("nextOffset"); @@ -157,7 +157,7 @@ private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition pa private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) { LOG.info("re-emitting batch, attempt " + attempt); String instanceId = (String) meta.get("instanceId"); - if (!_config.forceFromStart || instanceId.equals(_topologyInstanceId)) { + if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) { SimpleConsumer consumer = _connections.register(partition); long offset = (Long) meta.get("offset"); long nextOffset = (Long) meta.get("nextOffset"); diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java index 1f1bbbcd97e..965eaeae609 100644 --- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java +++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java @@ -110,17 +110,17 @@ public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exc @Test public void getOffsetFromConfigAndDontForceFromStart() { - config.forceFromStart = false; + config.ignoreZkOffsets = false; config.startOffsetTime = OffsetRequest.EarliestTime(); createTopicAndSendMessage(); - long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()); + long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime()); long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config); assertThat(latestOffset, is(equalTo(offsetFromConfig))); } @Test public void getOffsetFromConfigAndFroceFromStart() { - config.forceFromStart = true; + config.ignoreZkOffsets = true; config.startOffsetTime = OffsetRequest.EarliestTime(); createTopicAndSendMessage(); long earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());