Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-822: Kafka Spout New Consumer API #1131

Merged
merged 2 commits into from Mar 31, 2016
Merged

Conversation

hmcl
Copy link
Contributor

@hmcl hmcl commented Feb 21, 2016

This patch is still under development and was uploaded at this moment for early testing. Please read README.

There may be a bug in the offsets management, because of diff o 1. I am looking into it.
Currently polling from an arbitrary offset is possible but it will come in the next patch, today or tomorrow
I refactored the code a bit and left, maybe, some unnecessary locking. I am also looking into it.

@connieyang @jianbzhou @tgravescs please let me know of any other requirements you may have and I will address them soon.

Thanks

@tgravescs
Copy link

@hmcl I haven't looked at code yet but don't see anything trident related so I was just curious if you are planning on adding trident support here or if that is separate jira?

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.tuple.Values;

public class KafkaRecordTupleBuilder<K,V> implements org.apache.storm.kafka.spout.KafkaTupleBuilder<K, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to qualify KafkaTupleBuilder with full package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@knusbaum agree. When I was moving stuff around to organize the packages this happened. It has already been cleaned up and will be in the next patch.

@hmcl
Copy link
Contributor Author

hmcl commented Feb 22, 2016

@tgravescs trident will be appended to this patch. In the meantime it would be helpful if you could let me know of any other requirements that you may have, and if this initial commit is covering most things that you may need. I am pushing another patch soon with better exception handling, logging, testing, etc. Thanks.

@connieyang
Copy link

+1 on Trident support. Thanks!

// all the tuples that are in traffic when the rebalance occurs will be added
// to black list to be disregarded when they are either acked or failed
private boolean isInBlackList(org.apache.storm.kafka.spout.MessageId msgId) {
return blackList.contains(msgId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably just get rid of this method and inline blackList.contains()

@knusbaum
Copy link
Contributor

I don't think the blacklist is going to work in general since it is possible in Storm to get multiple ack or fail messages for the same messageId. I think it would be better to empty emittedTuples and failedTuples on rebalance and determine if a message should be ignored based on its membership in those maps.

Everything else looks pretty good besides normal cleanup.

public void run() {
commit = true;
}
}, 1000, kafkaSpoutConfig.getOffsetsCommitFreqMs(), TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace this with a static Timer? We might have multiple instances of the spout in a single worker, and having a thread per each just to set a volatile boolean to true, feels like overkill.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@revans2 it may be possible but depends on what we want to do. My initial implementation used this thread to do the commits to Kafka. However that causes ConcurrentModificationException on the Kafka side, as it is single threaded. The immediate fix was to just create this volatile variable and leave it as is. This was meant to be temporary all along, an just pushed it like this for an early review attending to the urgency of the patch.

Nevertheless, we should discuss if we would like to consider in the future using the Kafka option commitAsync, which receives as parameter OffsetCommitCallback, which could be used to manage the bookkeeping state. I suppose even if we consider this it won't be in this patch... but I wonder if this piece of functionality would helpful for that scenario, if we ever want to consider it.

I am referring to this comment as C1 in another comment bellow.

@revans2
Copy link
Contributor

revans2 commented Feb 22, 2016

Looking through the code I think we are keeping track of way too much state. In some places we cannot help it, but in most places we can probably rely on storm to keep track of the state for us.

First of all we probably don't need failed, emittedTuples, nor blacklist. Just put the information we need in the message id and let storm keep track of it for you. The Message ID that you pass in never leaves the spout and is never read by storm, except to check if it is null or not, so it become very convenient in implementing a best effort reply cache.

public class MessageId {
    private final TopicPartition topicPart;
    private final long offset;
    private final long epoc;
    private final List<Object> values;
    private int numFails;

   ...
}

To implement blacklisting instead of clearing state, and shifting things around, just increment an epoc counter for the entire spout. Then in the logic for fail (where we do the replay) we would have something like.

public void fail(Object mid) {
    MessageId id = (MesageId)mid;
    id.incNumFails();
    if (id.getEpoc() == currentEpic.get()) {
      if (id.getNumFails > CUT_OFF) {
        markAsDone(id);
      } else {
        collector.emit(id.getValues(), id);
      }
    }
}

And ack would be similar

public void ack(Object mid) {
    MessageId id = (MesageId)mid;
    if (id.getEpoc() == currentEpic.get()) {
      markAsDone(id);
    }
}

What do you think?

@lujinhong
Copy link
Contributor

+1 on trident spout

@tgravescs
Copy link

@hmcl Its obvious people are also looking for trident support but I'm wondering if it would make sense to split that into separate jira? That way we could get the regular Spout in so people waiting on that would be unblocked and then the trident after.

Maybe it depends on how far along you are or how much overlap they have. What do you think?

Do you want help with any parts of this?

@hmcl
Copy link
Contributor Author

hmcl commented Feb 25, 2016

@tgravescs probably there is a bit of miscommunication here. I didn't complete the spout part yet because I have been waiting on getting answers to some follow up questions I made to the code review comments, such that I can try to address them completely. I will push what I have, and trident can go in separate if that fits your timeline better.

@revans2
Copy link
Contributor

revans2 commented Feb 25, 2016

@hmcl sorry I missed your questions in all my e-mail

@hmcl
Copy link
Contributor Author

hmcl commented Feb 25, 2016

@revans2 no worries. Concerning your code snippet suggestion, I agree with most of it. We can definitely keep a lot of the state in the MessageId object. I agree it is indeed the ideal solution. I considered it initially but was concerned of how expensive it would be to keep all of that state in the MessageId.

However, I don't think that Using the markAsDone(id) strategy will suffice to address the cases where tuples get acked out of order, and the cases were a sequence of tuples will get acked in sequence, but shifted from the last offset committed. In this scenario we need to have a way to keep track of the offset sequences that are ready to be committed. I still think that keeping acked Map<TopicPartition, OffsetEntry> will take care of this in an efficient way. If there is a better way, of course, I would totally welcome a suggestion.

@revans2
Copy link
Contributor

revans2 commented Feb 25, 2016

I agree that we need to keep track of outstanding tuples + offsets, and the code you have around acked feels OK, but we could possibly clean it up a little. I called it markAsDone instead of addAckedTuples because I didn't really like having a tuple that failed too many times be marked as acked, but thinking about it more it is not that big of a deal.

@hmcl
Copy link
Contributor Author

hmcl commented Feb 26, 2016

@tgravescs @revans2 I am just finalizing some testing and I will push in the patch after lunch.

@hmcl
Copy link
Contributor Author

hmcl commented Feb 28, 2016

@tgravescs @revans2 I have pushed the latest changes. Please let me know of any feedback or further requirements you may have. Thanks.

@hmcl
Copy link
Contributor Author

hmcl commented May 9, 2016

@jianbzhou thanks for your feedback. Let me take a look at this and I will get back to you shortly.

@jianbzhou
Copy link

thanks Hmcl.

Just found below log constantly show up, seems it constantly try to commit one offset which is actually committed to kafka already – it might be caused by group rebalance – so a smaller offset (smaller than the committed offset) is acked back lately.

For example(it is our assumption, kindly correct me if wrong): one consumer commit offset 1000, polled 1001~1050 messages and emitted, also message was acked for 1001 ~ 1009, then a rebalance happened, another consumer poll message from 1000 to 1025, and commit the offset to 1010, then the message 1010(was emitted before the rebalance) was acked back. This will cause 1010 will never be committed as per the logic in findNextCommitOffset method – because this offset was already commited to kafka successfully.

Log is:
2016-05-09 03:02:14 io.ebay.rheos.KafkaSpout [INFO] Unexpected offset found [37137]. OffsetEntry{topic-partition=oradb.core4-lcr.caty.ebay-bids-3, fetchOffset=37138, committedOffset=37137, ackedMsgs={topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137, numFails=0}|{topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137, numFails=0}}

We applied below fix - For OffsetEntry.add(KafkaSpoutMessageId msgId) method, we changed the code as per below – only add acked message when its offset is bigger than the committed offset.

public void add(KafkaSpoutMessageId msgId) { // O(Log N)
if(msgId.offset() > committedOffset)//this line is newly added
ackedMsgs.add(msgId);
}

Could you please help take a look at the above and let me know your thoughts? Thanks.

@jianbzhou
Copy link

HI Hcml,
Just fyi -
In method doSeekRetriableTopicPartitions, we find below code:
kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset
Above code is contradictory with the comments, we replaced above line to below:
OffsetAndMetadata commitOffset = kafkaConsumer.committed(rtp);
kafkaConsumer.seek(rtp, commitOffset.offset()); // Seek to last committed offset

Any comments please let me know. Thanks!

For all above identified issues, we applied some quick and dirty fix and the testing is in progress, we will let you know the final testing result later.

@hmcl
Copy link
Contributor Author

hmcl commented May 19, 2016

@jianbzhou thanks once again for your feedback. Can you please share the results of your tests? I am working on adding more tests to this, as well as fix some of these corner cases, possibly using some of your suggested solutions, if they are the most suitable.

@jianbzhou
Copy link

@hmcl sorry for the late reply. we have made some quick and dirty fix for above issues, I will share the new spout to you via email so you can do a quick comparison. Now it seems working for our project. Please help review and let us know your comments/concern on the fix.

One customer of us who also use the spout they found some other issues:

  1. Work load is not distributed to all spout tasks(as per the storm topology)
  2. No progress on some partitions (as per the log)
  3. No commit on some partitions (as per the log)
    I will start looking into that tomorrow once I get the detail log file. Also if you have any clue on this please kindly advise.

@hmcl
Copy link
Contributor Author

hmcl commented May 24, 2016

@jianbzhou can you please email me what you have such that we can provide with a fix? Is there a way you can share the kafka setup that causes the issues that you mention ?

I should upload my test cases later today, and that should help us address any possible issues. Thanks.

@jianbzhou
Copy link

@hmcl, could you share your email address? I will send our latest spout so you can have a quick review - this version is working in our testing env for about a week. Our customer faced one issue which seems that the load is not well distributed across all partition in 0.9 KafkaSpout, some partitions have no commit, progress...I am still waiting for the kafka setup from the customer and shall send to you once i have.

@hmcl
Copy link
Contributor Author

hmcl commented May 24, 2016

@jianbzhou please email me to

image

@jianbzhou
Copy link

@hmcl , sorry for the late reply, i was on leave and just now i send the updated spout to you, pls help review. Below is the major changes:

  1. In poll method, change numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets()
    to emitted.size() < kafkaSpoutConfig.getMaxUncommittedOffsets();
  2. In method doSeekRetriableTopicPartitions, seems your code is contradicted with the comment, i changed
    from:
    else {
    kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset
    }
    To:
    else {
    // kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset
    OffsetAndMetadata commitOffset = kafkaConsumer.committed(rtp);
    kafkaConsumer.seek(rtp, commitOffset.offset()); // Seek to last committed offset
    }
  3. in ack method, we found acked.get(msgId.getTopicPartition()) might return null so we add some defensive validation - possibly due to kafka consumer rebalance, the partition doesn't belongs to this spout anymore
  4. in OffsetEntry.add method, we add one condition, only add the message when condition is met - if (msgId.offset() > committedOffset). This change was also applied in method doSeekRetriableTopicPartitions.

@jianbzhou
Copy link

@hmcl, currently if user give firstPollOffsetStrategy=UNCOMMITTED_LATEST or LATEST, the spout will not work, because if a kafka consumer re-balance happened, the offset will be seeked to the end, and there will be lots of messages not consumed/emitted/acked&failed, so will never find the next continuous offset to commit, so the log will keep showing that "Non continuous offset found"......

I have a questions here - if a spout read and emit one message, I assume storm will ensure the message will be acked or failed without exception, right? because if it is possible that one emitted message failed to get acked or failed message under some strange situations, it means we cannot find the continuous message to commit, which will directly break the spout. Could you please help confirm if my assumption is correct?

If my assumption is not correct - which means one emitted message may not be able to get acked or failed message back, then we must change the spout(need a timeout setting if failed to find next continuous message to commit) - currently the spout will always find the next continuous message to commit, it will try forever...

due to the spout will always find the next continuous message to commit, we need to be cautious for below method:
private boolean poll() {
return !waitingToEmit() && emitted.size() < kafkaSpoutConfig.getMaxUncommittedOffsets();
}
if the MaxUncommittedOffsets is too small, the spout will frequently stop polling from kafka, if a rebalance happened and seek back to the failed message, at this moment if the spout stop polling, will also cause the spout failed to find the next committed message. Currently we set this value to 200000 and seems working fine for now.
Looking forward to hearing from you! thanks!

@hmcl
Copy link
Contributor Author

hmcl commented May 26, 2016

@jianbzhou looking at it.

@hmcl
Copy link
Contributor Author

hmcl commented May 28, 2016

@jianbzhou I confirm that your suggested fix for doSeekRetriableTopicPartitions is correct. I am going to include that in the next patch.

@connieyang
Copy link

Following the Trident API support for the new KafkaSpout implementation... Is anyone working on this? Thanks.

@hmcl
Copy link
Contributor Author

hmcl commented May 28, 2016

@connieyang I am finishing addressing some issues brought up by the initial users of this kafka spout, as well as unit test coverage, and will push the trident API right after.

@hmcl
Copy link
Contributor Author

hmcl commented May 28, 2016

@jianbzhou any updates on

`One customer of us who also use the spout they found some other issues:

  1. Work load is not distributed to all spout tasks(as per the storm topology)
  2. No progress on some partitions (as per the log)
  3. No commit on some partitions (as per the log)
    I will start looking into that tomorrow once I get the detail log file. Also if you have any clue on this please kindly advise.`

This is a bit surprising. Can you elaborate on this. Thanks.

@jianbzhou
Copy link

@hmcl, 1, work load is not distributed well is not because of the spout, that is a kafka cluster setup issue and now is resolved 2, for the other two, I dig into the log(sent to your via email) - seems everytime when a re-balance happens, the spout seek to a bigger offset than the committed offset in this partition, per my understanding, this will cause some message not be able to consumed/emitted, so all the log show "Non continuous offset found"
user spout setting is: firstPollOffsetStrategy=UNCOMMITTED_LATEST, pollTimeoutMs=2000, offsetCommitPeriodMs=10000, maxRetries=2147483647
I know firstPollOffsetStrategy cannot be EARLIEST or LATEST, but seems to me UNCOMMITTED_LATEST should not cause this issue.
I asked user to try UNCOMMITTED_EARLIEST and now seems the issue does NOT happen again as per the log, though it may happen later...
From the code perspective, i cannot understand why the weird behavior happened, could you help?

Also, per our previous testing - we find once - a worker died and re balance happened, we find one spout(not in the died worker) have some message not acked or failed back. That also caused the "Non continuous offset found" show many times in the log, which will cause no message will be committed to kafka. The only solution will be restart the storm topology.

We emit message in this way - kafkaSpoutStreams.emit(collector, tuple, msgId); Could you please help confirm - storm would ensure all the messages that emitted by the spout will be acked/failed back without exception? Because if this is not the case, the spout will not be able to find the continuous offset to commit, then we must fix this issue urgently as we plan to release the change early next month. Please help advise. thanks!

@hmcl
Copy link
Contributor Author

hmcl commented May 31, 2016

@jianbzhou Storm guarantees that all the messages are either acked or failed. There is the property "topology.message.timeout.secs" https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/Config.java#L1669

If Storm is configured to use acks, and the acks don't arrive in a certain amount of time, the tuple will be retired. You don't have to worry about the scenario you described, and basically implement the timeout yourself.

@jianbzhou
Copy link

@hmcl, today we found one NullpointerException and i applied a fix as below:

In method doSeekRetriableTopicPartitions, if one partition was never committed before one message is failed back, we will encounter below issue. Could you please help review and let me know if this fix is oaky?

Code change is as below -from

// kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset
OffsetAndMetadata commitOffset = kafkaConsumer.committed(rtp);
kafkaConsumer.seek(rtp, commitOffset.offset()); // Seek to last committed offset

to
// kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset
OffsetAndMetadata commitOffset = kafkaConsumer.committed(rtp);
if(commitOffset != null){
kafkaConsumer.seek(rtp, commitOffset.offset()); // Seek to last committed offset
} else{
LOG.info("In doSeekRetriableTopicPartitions, topic partition is {}, no offset was committed for this partition, will seek back to the initial offset {}", rtp, offsetEntry.initialFetchOffset);
kafkaConsumer.seek(rtp, offsetEntry.initialFetchOffset); //no offset committed for this partition, seek to the original fetch offset
}

@hmcl
Copy link
Contributor Author

hmcl commented Jun 2, 2016

@jianbzhou thanks. Looking at it.

@jianbzhou
Copy link

@hmcl, just fyi, we found one new issue:
when rebalance happened, in method onPartitionsAssigned->initialize(Collection partitions)
we can see below two lines:
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
final long fetchOffset = doSeek(tp, committedOffset);

If the committedOffset is out of range(say kafka log file removed), the when the poll() method is called, the offset will reset as the property auto.offset.reset. This will cause the newly polled message has bigger offset, so there is a break between committed offset and the acked offset, no continuous offset will be found.
We will apply a quick fix for this.

@jianbzhou
Copy link

@hmcl and all, the new spout is fit for the at least once semantics and works fine for us, thanks a lot! Very recently one of our key customers asked to use a at most once implementation. Do we have any plan to have a at-most-once implementation? They set the topology.acker.executors=0 and found the spout is not working. Could you please help to evaluate - 1, will we implement this? 2. roughly how long time needed? Thanks

Requirement from customer - “topology.acker.executors” is a Storm parameter, which refers to at-least-once when it’s not 0, and at-most-once if it’s 0. We want to know do we have a at-most-once implementation?

@hmcl
Copy link
Contributor Author

hmcl commented Aug 20, 2016

@jianbzhou you can get obtain at most once semantics by setting maxRetries to zero. Here is the method to do so.

https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L206

@jianbzhou
Copy link

@hmcl and all, we have communicated via email for a while and going forward let's talk in this thread so everyone is in same page.
Base on the spout from the community(written by you), we have several fixes and it worked quite stable in our production for about 6 months.

We want to share the latest spout to you and could you please kindly help review and merge to the community version if any fix is reasonable? we want to avoid diverging too much from the community version.

Below are our major fixes:

  1. For failed message, in next tuple method, originally the spout seek back to the non-continuous offset, so the failed message will be polled again for retry, say we seek back to message 10 for retry, now if kafka log file was purged, earliest offset is 1000, it means we will seek to 10 but reset to 1000 as per the reset policy, and we cannot poll the message 10, so spout not work.
    Our fix is: we manually catch the out of range exception, commit the offset to earliest offset first, then seek to the earliest offset

  2. Currently the way to find next committed offset is very complex, under some edge cases – a), if no message acked back because bolt has some issue or cannot catch up with the spout emit; b) seek back is happened frequently and it is much faster than the message be acked back
    We give each message a status – None, emit, acked, failed(if failed number is bigger than the maximum retry, set to acked)

  3. One of our use cases need ordering in partition level, so after seek back for retry, we re-emit all the follow messages again no matter they have emitted or not, if possible, maybe you can give an option here to configure it – either re-emit all the message from the failed one, or just emit the failed one, same as current version.

  4. We record the message count for acked, failed, emitted, just for statistics.

Could you please kindly help review and let us know if you can merge it into the community version? Any comments/concern pls feel free to let us know. Btw, I just send the latest code to you via email.

@erikdw
Copy link
Contributor

erikdw commented Jan 4, 2017

Btw, I just send the latest code to you via email.

@jianbzhou: can you please clarify the "you" in this statement?

we have communicated via email for a while and going forward let's talk in this thread so everyone is in same page.

@jianbzhou : wouldn't it be more appropriate to open a new STORM-XXXX JIRA and then communicate via that JIRA? As opposed to private emails with diffs, or adding comments into a merged and done PR?

@hmcl
Copy link
Contributor Author

hmcl commented Jan 4, 2017

@jianbzhou thanks for your suggested fix and for the summary of changes. I think that the best way to go about incorporating your changes is to create a JIRA with summary along the lines "Kafka Spout Improvements/Fixes", and in the description put the contents you pasted above. We can then create a pull request with the suggested changes, and after it is reviewed, merge it into master.

It may not be a non trivial merge because other users have already found and fixed some bugs on the code, so likely your suggested fix has a different base commit. Furthermore, one has to evaluate how to safely add your suggested fixes on top of other fixes, which apparently are currently working. There is also a refactoring of the spout going on, which will also add more diffs, so we have to take everything into consideration.

@jianbzhou I suggest the following: Can you please create the JIRA, or if you don't have the permissions to do so, please let me know, and I can do it. Then, you can either we can attach your patch to the JIRA, or create a pull request with commit header that matches the JIRA summary. That will link the pull request with the JIRA. If you decide to attach the patch, or even the file with the java source to the JIRA, I will create the pull request.

@jianbzhou do you consider adopting a community version which may potentially include only part of your fixes, and have other fixes committed by other contributors. Or would you rather run in your production environment the version that you currently have ?

@erikdw "you" would be "me" :).

@jianbzhou
Copy link

@erikdw @hmcl , sorry for the late reply.
include only part of my fixes is okie for me, we hope finally the community version has the proper fixes so we can use the community version. I don't have the access, could you please help me create the Jira and pull request(Pls feel free to let me know if you need any help from me)? many thanks.

@erikdw
Copy link
Contributor

erikdw commented Jan 6, 2017

@jianbzhou : you can file a STORM JIRA ticket yourself actually -- you just need to create an Apache JIRA account.

@jianbzhou
Copy link

Thanks erikdw! sorry i was out of office in the last couple of days. I just created a jira account and will create a jira ticket asap and assign to hmcl.

@jianbzhou
Copy link

@hmcl , fired a Jira ticket STORM-2292 and attached the code to the Jira...please let me know if any comments.

@hmcl
Copy link
Contributor Author

hmcl commented Jan 16, 2017

@jianbzhou Thanks for filing the JIRA. I have assigned it to me, such that it's easier to keep track and follow up on it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet