Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion external/storm-kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ The KafkaConfig class also has bunch of public variables that controls your appl
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean forceFromStart = false;
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
Expand Down Expand Up @@ -120,6 +120,25 @@ spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
```

### How KafkaSpout stores offsets of a kafka topic and recovers incase of failures

As shown in the above KafkaConfig properties , user can control where in the topic they can start reading by setting **KafkaConfig.startOffsetTime.**

These are the options
1. **kafka.api.OffsetRequest.EarliestTime() or -2 (value returned by EarliestTime())** which makes the KafkaSpout to read from the begining of the topic
2. **kafka.api.OffsetRequest.LatestTime() or -1 (value returned by LatestTime())** which starts at the end of the topic ,any new messsages that are being written to the topic
3. **System.time.currentTimeMillis()**

When user first deploys a KakfaSpout based topology they can use one of the above options. As the topology runs
KafkaSpout keeps track of the offsets its reading and writes these offset information under **SpoutConfig.zkRoot+ "/" + SpoutConfig.id**
Incase of failures it recovers from the last written offset from zookeeper.

If users deployed a topology , later killed and re-deploying should make sure that **SpoutConfig.id** and **SpoutConfig.zkRoot**
remains the same otherwise Kafkaspout won't be able to start from stored zookeeper offsets.

Users can set **KafkaConfig.ignoreZkOffsets** to **true** to make KafkaSpout ignore any zookeeper based offsets
and start from configured **KafkaConfig.startOffsetTime**.

## Using storm-kafka with different versions of Scala

Storm-kafka's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in
Expand Down
2 changes: 1 addition & 1 deletion external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class KafkaConfig implements Serializable {
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean forceFromStart = false;
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
Expand Down
5 changes: 1 addition & 4 deletions external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {


public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
if ( config.forceFromStart ) {
startOffsetTime = config.startOffsetTime;
}
long startOffsetTime = config.startOffsetTime;
return getOffset(consumer, topic, partition, startOffsetTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ public PartitionManager(DynamicPartitionConnections connections, String topology
if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
_committedTo = currentOffset;
LOG.info("No partition information found, using configuration to determine offset");
} else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
} else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) {
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
} else {
_committedTo = jsonOffset;
LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition
if (lastTopoMeta != null) {
lastInstanceId = (String) lastTopoMeta.get("id");
}
if (_config.forceFromStart && !_topologyInstanceId.equals(lastInstanceId)) {
if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) {
offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime);
} else {
offset = (Long) lastMeta.get("nextOffset");
Expand Down Expand Up @@ -157,7 +157,7 @@ private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition pa
private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
LOG.info("re-emitting batch, attempt " + attempt);
String instanceId = (String) meta.get("instanceId");
if (!_config.forceFromStart || instanceId.equals(_topologyInstanceId)) {
if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
SimpleConsumer consumer = _connections.register(partition);
long offset = (Long) meta.get("offset");
long nextOffset = (Long) meta.get("nextOffset");
Expand Down
6 changes: 3 additions & 3 deletions external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,17 @@ public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exc

@Test
public void getOffsetFromConfigAndDontForceFromStart() {
config.forceFromStart = false;
config.ignoreZkOffsets = false;
config.startOffsetTime = OffsetRequest.EarliestTime();
createTopicAndSendMessage();
long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime());
long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
assertThat(latestOffset, is(equalTo(offsetFromConfig)));
}

@Test
public void getOffsetFromConfigAndFroceFromStart() {
config.forceFromStart = true;
config.ignoreZkOffsets = true;
config.startOffsetTime = OffsetRequest.EarliestTime();
createTopicAndSendMessage();
long earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
Expand Down