Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

MarkOffset doesn't persist offset #23

Closed
curtisallen opened this issue Feb 9, 2016 · 11 comments
Closed

MarkOffset doesn't persist offset #23

curtisallen opened this issue Feb 9, 2016 · 11 comments

Comments

@curtisallen
Copy link

First thank you for writing this library it's been a great help to us.

I'm noticing some strange behavior with offset persistence. I've been testing with the provided cmd/sarama-cluster-cli/main.go tool, targeting a kafka broker set up using sarama's vagrant environment.

Consumer metadata doesn't seem to be making it to zookeeper /consumer path. This prevents Kafka's tools like kafka-consumer-groups.sh from working properly, furthermore after a broker restart or consumer restart the offset position is lost resulting it data processing loss. I'm not sure if this is a bug in this project or in sarama but thought this would be a good place to start.

Here is a session where I use kafka's verifiable-producer tool to publish 10 messages on a topic
https://asciinema.org/a/1ohv1sp2h19wz9vkt4w2qcmt2

Then I start a sarama-cluster using

go run cmd/sarama-cluster-cli/main.go -brokers="192.168.100.67:9091" -group="bsm" -offset="oldest" -topics="test.4" -verbose=true
...
test.4/0/0  3
test.4/0/1  7
test.4/2/0  0
test.4/2/1  4
test.4/2/2  8
test.4/3/0  2
test.4/3/1  6
test.4/1/0  1
test.4/1/1  5
test.4/1/2  9
...

Keeping this consumer running I inspect zookeeper and no consumer data is found.

https://asciinema.org/a/1n3dwextiqnl4kfma2lbselmp

It's my understanding that kafka should be handling all of the zookeeper ugliness for us, but that doesn't seem to be the case. Looking into the kafka zookeeper utility I noticed that the /consumers path is still used on the kafka side.

Have you ever seen your offsets persist in zookeeper using sarama-cluster?

@dim
Copy link
Member

dim commented Feb 9, 2016

Hey @curtisallen, so no, I haven't checked zookeeper since kafka 0.9, but sarama-cluster's tests seem to pass which suggests that offsets are committed somewhere. To my knowledge kafka 0.9 is using a special internal topic for offset management (see here) instead of zookeeper, there is also a section for migrating offsets in the latest documentation.

@curtisallen
Copy link
Author

Thanks for the quick reply @dim
Taking zookeeper out of the picture offsets don't seem to be persisted. Here's a gist of the cli consumer running notice the same 10 messages get processed over and over, nothing is changing on the kafka side.

I was looking into the tests and love the kafka broker spawn approch you've done. I could be misreading the tests but it looks as if the actual offset verification happens via a mock client and doesn't actually query Kafka (here)
The kafka process that is spawned by the BeforeSuite is used to ensure we can fetch partition info, but isn't used to assert offsets are persisted.

Finally I'd expect the kafka-consumer-groups.sh tool to work using the --new-consumer flag but I haven't had any such luck.

vagrant@vagrant-ubuntu-trusty-64:/opt/kafka-9091/bin$ !186
./kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.100.67:9091 --list
bsm
vagrant@vagrant-ubuntu-trusty-64:/opt/kafka-9091/bin$ !194
./kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.100.67:9091 --describe --group bsm
Error while executing consumer group command Error reading field 'user_data': java.lang.IllegalArgumentException
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'user_data': java.lang.IllegalArgumentException
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
    at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:109)

@dim
Copy link
Member

dim commented Feb 11, 2016

So yeah, https://github.com/bsm/sarama-cluster/blob/master/cmd/sarama-cluster-cli/main.go never actually calls MarkOffset, I will add an option

@dim
Copy link
Member

dim commented Feb 11, 2016

Ignore me, it actually does. Will investigate.

@dim
Copy link
Member

dim commented Feb 11, 2016

OK, so I have done this and it works absolutely fine:

  1. on a fresh kafka instance, create a new topic with 8 partitions: bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --partitions 8 --topic test --replication-factor 1

  2. seeded 100k messages via

    package main
    
    import (
      "fmt"
      "log"
    
      "github.com/Shopify/sarama"
    )
    
    func main() {
      producer, err := sarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, nil)
      abortOn(err)
      defer producer.Close()
    
      for i := 1; i <= 100000; i++ {
        val := sarama.StringEncoder(fmt.Sprintf("DATA-%08d", i))
        msg := &sarama.ProducerMessage{Topic: "test", Key: nil, Value: val}
        producer.Input() <- msg
      }
    }
    
    func abortOn(err error) {
      if err != nil {
        log.Fatal(err)
      }
    }
  3. consumed messages via: sarama-cluster-cli -brokers=127.0.0.1:9092 -group=testgroup -offset=oldest -verbose -topics=test

    ...
    test/7/12479  DATA-00099993
    test/7/12480  DATA-00099994
    test/7/12481  DATA-00099999
    ...
    test/5/12579  DATA-00099966
    test/5/12580  DATA-00099971
    test/5/12581  DATA-00099977
    test/5/12582  DATA-00099997
    
  4. resumed via: sarama-cluster-cli -brokers=127.0.0.1:9092 -group=testgroup -offset=oldest -verbose -topics=test

    2016/02/11 11:53:42 cluster/consumer -46cbc3d7-2205-442c-b634-5ff9a5fe8c80 consume test/0 from 12523
    2016/02/11 11:53:42 cluster/consumer -46cbc3d7-2205-442c-b634-5ff9a5fe8c80 consume test/1 from 12511
    2016/02/11 11:53:42 consumer/broker/0 added subscription to test/0
    2016/02/11 11:53:43 cluster/consumer -46cbc3d7-2205-442c-b634-5ff9a5fe8c80 consume test/2 from 12667
    2016/02/11 11:53:43 consumer/broker/0 added subscription to test/1
    2016/02/11 11:53:43 cluster/consumer -46cbc3d7-2205-442c-b634-5ff9a5fe8c80 consume test/3 from 12476
    2016/02/11 11:53:43 consumer/broker/0 added subscription to test/2
    2016/02/11 11:53:44 cluster/consumer -46cbc3d7-2205-442c-b634-5ff9a5fe8c80 consume test/4 from 12413
    2016/02/11 11:53:44 consumer/broker/0 added subscription to test/3
    2016/02/11 11:53:44 cluster/consumer -46cbc3d7-2205-442c-b634-5ff9a5fe8c80 consume test/5 from 12583
    2016/02/11 11:53:44 consumer/broker/0 added subscription to test/4
    2016/02/11 11:53:45 cluster/consumer -46cbc3d7-2205-442c-b634-5ff9a5fe8c80 consume test/6 from 12345
    2016/02/11 11:53:45 consumer/broker/0 added subscription to test/5
    2016/02/11 11:53:45 cluster/consumer -46cbc3d7-2205-442c-b634-5ff9a5fe8c80 consume test/7 from 12482
    2016/02/11 11:53:45 consumer/broker/0 added subscription to test/6
    2016/02/11 11:53:46 consumer/broker/0 added subscription to test/7
    

As you can see, the consumer resumes from the last consumed offsets. Anything I am missing here?

@curtisallen
Copy link
Author

Thanks for looking into this. I've seen the behavior you described above, the offset seems to be persisted. However if you wait a couple of hours and relaunch your consumer or restart the kafka broker the consumer will start at the oldest offset again.

@curtisallen
Copy link
Author

What ever the issue I think you've proven that this problem is apart of Sarama or this package. The offsets are persisted but then get lost in kafka somehow, most likely a kafka bug. I'll keep digging there.
Thanks for your help!
closing

@curtisallen
Copy link
Author

For prosperities sake...
The reason kafka-consumer-groups didn't work was cased by a bug in kafka 0.9.0.0
https://issues.apache.org/jira/browse/KAFKA-2695

You can read the resolution here
https://issues.apache.org/jira/browse/KAFKA-3231?focusedCommentId=15143018&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15143018

@curtisallen
Copy link
Author

One more thing to note.
The Current offset and lag are unknown to kafka, could be a bug in this client or sarama.

./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --describe --group testgroup
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
testgroup, test, 0, unknown, 12, unknown, _/192.168.1.5
testgroup, test, 1, unknown, 13, unknown, _/192.168.1.5
testgroup, test, 2, unknown, 12, unknown, _/192.168.1.5
testgroup, test, 3, unknown, 12, unknown, _/192.168.1.5
testgroup, test, 4, unknown, 13, unknown, _/192.168.1.5
testgroup, test, 5, unknown, 13, unknown, _/192.168.1.5
testgroup, test, 6, unknown, 12, unknown, _/192.168.1.5
testgroup, test, 7, unknown, 13, unknown, _/192.168.1.5

@dim
Copy link
Member

dim commented Feb 24, 2016

@curtisallen
Copy link
Author

Oh wow interesting that does sound similar. I'll have to retest with the
latest kafka and sarama-cluster releases
On Tue, Feb 23, 2016 at 9:36 PM Dimitrij Denissenko <
notifications@github.com> wrote:

Could either of these be related:


Reply to this email directly or view it on GitHub
#23 (comment).

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants