what's the problem with consumer group #199

Closed
xxhhccisme opened this Issue Aug 21, 2014 · 9 comments

Projects

None yet

6 participants

@xxhhccisme

I use the group as the right way, but it just can't do the way i want.
here is my code,

#!/usr/bin/env python

import sys

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer

def main():
    if len(sys.argv) != 2:
        sys.exit(0)

    kafka = KafkaClient("localhost:9092")
    if sys.argv[1] == "put":
        producer = SimpleProducer(kafka)
        resp = producer.send_messages("my-topic", "some message")
        print resp
    elif sys.argv[1] == "get":
        consumer = SimpleConsumer(kafka, "my-foo-group", "my-topic")
        for message in consumer:
            print message

if __name__ == "__main__":
    main()

What i want is , if i send "my-topic" a message, only one consumer can get this message from the group("my-foo-group")
However, what i found out is, no matter how many consumer process i start, all of them will get this message at the end.
Am i wrong or it's the problem of kafka python client ?

@wizzat
Collaborator
wizzat commented Aug 21, 2014

Consumer groups are more about offset management than about preventing double consumption between consumers. There is a concept called coordinated consumer groups, but that is not available in non-JVM clients.

The way that I handle this is to spin up a python consumer per partition instead of having every consumer read every partition.

On Aug 21, 2014, at 3:18, cc notifications@github.com wrote:

I use the group as the right way, but it just can't do the way i want.
here is my code,

#!/usr/bin/env python

import sys

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer

def main():
if len(sys.argv) != 2:
sys.exit(0)

kafka = KafkaClient("localhost:9092")
if sys.argv[1] == "put":
producer = SimpleProducer(kafka)
resp = producer.send_messages("my-topic", "some message")
print resp
elif sys.argv[1] == "get":
consumer = SimpleConsumer(kafka, "my-foo-group", "my-topic")
for message in consumer:
print message
if name == "main":
main()

What i want is , if i send "my-topic" a message, only one consumer can get this message from the group("my-foo-group")
However, what i found out is, no matter how many consumer process i start, all of them will get this message at the end.
Am i wrong or it's the problem of kafka python client ?


Reply to this email directly or view it on GitHub.

@rqc
rqc commented Aug 21, 2014

@morndust I started using kafka-python just recently as well and found out that this is a feature/limitation as per #173. I ended up using kafka-python for my producers and https://github.com/bpot/poseidon, a kafka ruby client, to accomplish what you are asking in the consumer side.

Basically my use case is that I want to scale horizontally the consumption of messages in a consumer topic by spinning more consumer processes in different VMs, not just the current one, which is what MultipleProcessConsumer accomplishes. Kafka-python is not supporting this because once you turn on a new process then it starts from the beginning instead from the real offset, and the same messages are consumed by all the consumers within that topic.

Regardless, kafka-python really works well for everything else. Thanks to everyone involved in this project, it is really a relief not to have to deal with the JVM.

@wizzat
Collaborator
wizzat commented Aug 21, 2014

To be clear: Kafka-Python supports offset management and resumption. It does not support having C consumers and P partitions and automatically distributing load without duplicate readers for a message. If you need help getting resuming from an offset working, we'd be glad to help you out.

@mumrah
Collaborator
mumrah commented Aug 21, 2014

@morndust the group param in SimpleConsumer (my-foo-group) is only used for offset storing and retrieval, not coordinated consumption. Coordinated consumers (aka high-level consumers or "balanced" consumers) are only available to JVM clients and a few non-JVM clients, but not kafka-python.

@wizzat maybe we should put a note in the README to make this clear?

@xxhhccisme

@mumrah @wizzat

In summarize, the only solution to use coordinated consumer groups is split consumer to different partition ?

ps: complete the README would be appriciate for new user of kafka like me.

@xxhhccisme

One more question....
"There is a concept called coordinated consumer groups, but that is not available in non-JVM clients" by @wizzat
why this kind of thing happen? can you explain a little bit ? thanks in advance !

@mumrah
Collaborator
mumrah commented Aug 22, 2014

Currently, the "high-level" JVM consumers use ZK to coordinate which partitions are read by which threads. Each consuming thread in the JVM consumer will be reading from at least one partition, and these consumer threads can exist across multiple JVMs. This means you can create one logical "consumer group" that consists of several threads across several JVMs, e.g. a topic with 32 partitions could be read by 4 JVMs with 8 threads each and the data would be evenly distributed among the consumers.

The reason we haven't added this feature is that there is a complex algorithm involving ZooKeeper to make sure a thread is consuming the correct partition at the correct offset. There are plans to redesign this "coordinated consumption" in Kafka so that it does not depend on ZooKeeper. This will make it easier for clients like kafka-python to do this kind of thing.

So, in other words, we'll have it eventually.

HTH

@dpkp dpkp added the consumer label Aug 22, 2014
@xxhhccisme

thank you so much ! @mumrah

@dpkp dpkp closed this Sep 11, 2014
@ddieterly

I understand that kafka-python does not do consumer rebalancing (consumer failover) when consumers come and go. That has been made clear by this discussion thread. How does kafka-python handle broker rebalancing? Does it handle broker rebalancing?

From https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design we see that the high-level Java Consumers do the following:

  1. Auto/Hidden Offset Management
  2. Auto(Simple) Partition Assignment
  3. Broker Failover => Auto Rebalance
  4. Consumer Failover => Auto Rebalance

We know that kafka-python does not do #4, the consumer failover. Which of the others does kafka-python handle? Or conversely, which of the others does kakfa-python not handle? It appears to be handling #1 but none of the others. Can we get confirmation on that?

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment