Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not consuming the first message on the topic #62

Closed
mauricioszabo opened this issue Dec 30, 2020 · 12 comments · Fixed by #69
Closed

Not consuming the first message on the topic #62

mauricioszabo opened this issue Dec 30, 2020 · 12 comments · Fixed by #69
Assignees
Labels
high ver:0.3.0.1 verified bug Something isn't working
Milestone

Comments

@mauricioszabo
Copy link

Ok, this is a really strange bug: I'm trying to use parallel-consumer with Clojure. I'm using a local Kafka cluster (only one broker) to test things on my machine.

If I send a single message, and fire up the consumer, it consumes that message. So far, so good.
If I immediately stop the consumer, send another message, and fire up the broker... nothing happens. The message is not consumed at all.

BUT, if I stop the consumer, send ANOTHER message, and fire up the consumer again, it consumes ONLY the new message... not the old one.

If I send a batch of messages, the same problem happens: it ignores the first message, and consumes the rest. Here's a video for reference:

parallel-error.mp4

The code that causes this error is the following: please notice that there's nothing special about it - just instantiates a single consumer that prints the received message:

(defn -main  [& args]
  (let [consumer (KafkaConsumer. (doto (java.util.Properties.)
                                       (.put "group.id" "example-1")
                                       (.put "bootstrap.servers" "localhost:9092")
                                       (.put "auto.offset.reset" "earliest")
                                       (.put "enable.auto.commit" false)
                                       (.put "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer")
                                       (.put "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer")))
        options (.. (ParallelConsumerOptions/builder)
                    (ordering ParallelConsumerOptions$ProcessingOrder/UNORDERED)
                    (defaultMessageRetryDelay (java.time.Duration/ofMillis 500))
                    (maxConcurrency 1000)
                    (commitMode ParallelConsumerOptions$CommitMode/PERIODIC_CONSUMER_ASYNCHRONOUS)
                    (consumer consumer)
                    build)
        poller (doto (ParallelStreamProcessor/createEosStreamProcessor options)
                     (.subscribe ["example"]))]
    (.poll poller (as-consumer (fn [record]
                                 (locking poller
                                   (prn :HANDLING record)))))
    (Thread/sleep 3000)
    (.close poller)))
@mauricioszabo
Copy link
Author

I'm using the following versions, if it helps:

io.confluent.parallelconsumer/parallel-consumer-core "0.3.0.0"
org.apache.kafka/kafka-clients "2.5.0"

@mauricioszabo
Copy link
Author

Updated to org.apache.kafka/kafka-clients "2.7.0", same problem happens

@JorgenRingen
Copy link
Contributor

Verified that it happens in both sync and async commitmode.

long offsetOfNextExpectedMessageToBeCommitted = offset + 1;

offsetOfNextExpectedMessageToBeCommitted (offset+1) is actually commited and thereby ignored after restart.

Consuming offset 0 on topic "foo":
2021-01-07 10:49:26.138  INFO 30543 --- [pool-1-thread-1] c.e.p.ParallelConsumerDemoApplicationKt  : Consumed ConsumerRecord(topic = foo, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1610012966114, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)

Offset to commit returned by WorkManager is 1:
2021-01-07 10:49:26.148 DEBUG 30543 --- [    broker-poll] i.c.p.AbstractOffsetCommitter            : Will commit offsets for 1 partition(s): {foo-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}

After restart, ConsumerCoordinator set offset 1 as fetch-position:
2021-01-07 10:53:16.716  INFO 30575 --- [    broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-parallel-consumer-1, groupId=my-parallel-consumer] Setting offset for partition foo-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}

Next message on topic "foo" (offset 1) is ignored as offset was already committed. Don't understand why this happens only after restart and not while running 🤔

JorgenRingen added a commit to JorgenRingen/parallel-consumer that referenced this issue Jan 7, 2021
JorgenRingen added a commit to JorgenRingen/parallel-consumer that referenced this issue Jan 7, 2021
JorgenRingen added a commit to JorgenRingen/parallel-consumer that referenced this issue Jan 7, 2021
@JorgenRingen
Copy link
Contributor

@astubbs : Added a simple integration-test that reproduces the issue: JorgenRingen@ad52540

@astubbs astubbs linked a pull request Jan 15, 2021 that will close this issue
@astubbs
Copy link
Contributor

astubbs commented Jan 15, 2021

Hi @mauricioszabo! Welcome to the project and thanks for the feedback!

Sorry I missed this issue - been a bit crazy over the New Years :) That’s awesome you’re using it with Clojure! It’s been on my list of languages to play with for years… I'll put this at the top of my todo list now - I'm very much a bugs first kind of guy.

Ah, good old off by one bugs :)

Thank you again @JorgenRingen ! Great detective work there people - though I’m sad though that this wasn’t already covered in our test suite :/ Perhaps it is, and it only happens when against a real broker 🤔 ...

I've iterated a bit on the test, added another one that demonstrates what @mauricioszabo describes, as well as keeping @JorgenRingen 's identification where it first goes wrong..

On another note @mauricioszabo - if you come across any area where the API could be improved for Clujore users please let us know! It possible we can still tweet things, or create a Clojure wrapper in a seperate module. (That goes for any jvm language BTW..)

@astubbs astubbs self-assigned this Jan 15, 2021
@astubbs astubbs added high verified bug Something isn't working labels Jan 15, 2021
@astubbs astubbs added this to the 0.3.1 milestone Jan 15, 2021
@astubbs
Copy link
Contributor

astubbs commented Jan 15, 2021

Btw, did you come across this accidents or were you intentionally boundary testing?

Have you had good results with the library otherwise?

I wonder if there's some other interesting boundary conditions that aren't converted with integration tests.

@astubbs
Copy link
Contributor

astubbs commented Jan 15, 2021

Ok, I think I've got it. Try out the linked PR? It's caused when resuming, when there is no encoded offset information - the first message will always be skipped. The reason why it works on the 3rd invocation, is because nothing was committed in the 2nd (as there were no messages processed as the 2nd message was skipped). - In the third invocation, it again skips the 2nd message (the +1 bug), and continues from what it sees as the +2 message in the queue (which it is incorrectly is told was the previous resume point) - something like that - the code probably explains it better :)

@JorgenRingen
Copy link
Contributor

JorgenRingen commented Jan 15, 2021

Just tested PR locally and issue seems to be solved as far as I can see 👍 Would be nice if @mauricioszabo could verify :)

Thank you again @JorgenRingen ! Great detective work there people - though I’m sad though that this wasn’t already covered in our test suite :/ Perhaps it is, and it only happens when against a real broker 🤔 ...

Agree, thought it might be uncovered in the MultiInstanceRebalanceTest, but might not show up because it only triggers rebalance and not a complete "application restart" 🤔 Been planning on rewriting that test to cover more of these "application lifecycle scenarios", so will try to include this case there.

@JorgenRingen
Copy link
Contributor

JorgenRingen commented Jan 20, 2021

@astubbs Is it possible to prioritize this as a patch-release? Application-restarts are very common on platforms like kubernetes, so it causes a few to many skipped messages :-)

(current workaround is using offset=latest and unique consumer-group id on startup)

@astubbs
Copy link
Contributor

astubbs commented Jan 21, 2021

Releasing today :)

astubbs added a commit that referenced this issue Jan 21, 2021
…e encoded in metadata

#62

Added simple test for reproducing issue 62 where offset is skipped after restart
Test for full example as described
Test for first error
astubbs added a commit to astubbs/parallel-consumer that referenced this issue Jan 21, 2021
…o offsets are encoded in metadata

confluentinc#62

The off by one issue would cause the first message to be skipped in some situations.

Added simple test for reproducing issue 62 where offset is skipped after restart
Test for full example as described
Test for first error
astubbs added a commit to astubbs/parallel-consumer that referenced this issue Jan 21, 2021
…o offsets are encoded in metadata

confluentinc#62

The off by one issue would cause the first message to be skipped in some situations.

Added simple test for reproducing issue 62 where offset is skipped after restart
Test for full example as described
Test for first error
astubbs added a commit that referenced this issue Jan 21, 2021
…e encoded in metadata

#62

The off by one issue would cause the first message to be skipped in some situations.

Added simple test for reproducing issue 62 where offset is skipped after restart
Test for full example as described
Test for first error
@astubbs
Copy link
Contributor

astubbs commented Jan 21, 2021

Released. https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-parent/0.3.0.1/

@JorgenRingen
Copy link
Contributor

Tested and verified 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
high ver:0.3.0.1 verified bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants