Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnPartitionsAssigned & OnPartitionsRevoked Not Invoked #434

Closed
4 of 7 tasks
KarolisKaj opened this issue Feb 6, 2018 · 11 comments
Closed
4 of 7 tasks

OnPartitionsAssigned & OnPartitionsRevoked Not Invoked #434

KarolisKaj opened this issue Feb 6, 2018 · 11 comments

Comments

@KarolisKaj
Copy link

KarolisKaj commented Feb 6, 2018

Description

When assigning topics via Assign method neither OnPartitionsAssigned nor OnPartitionsRevoked is being called when additional consumer is connected. When I use Subscribe method when it calls these methods.

Since we want to perform start consuming from different offsets we are using assign, since with subscribe you cannot set offset to start consuming from.

Odd part:
When you perform manual Assign OnPartitionsAssigned event then for some reason it actually manages to call these events further down the lifetime of this consumer.

consumer.OnPartitionsAssigned += (_, partitions) =>
{
     consumer.Assign(partitions);
};

How can we use Assign method and as well get notified when new consumer of same consumer.group is connected to cluster?

How to reproduce

When initializing consumer one should use consumer.Assign(new[] { new TopicPartition(topic, 0) }); when you create another instance with same config you do not get events (OnPartitionsAssigned & OnPartitionsRevoked) called to handle balancing.

Checklist

Please provide the following information:

  • Confluent.Kafka nuget version: v0.11.3
  • Apache Kafka version: 0.10
  • Client configuration: "group.id", "DeleteMePlease"
    "bootstrap.servers", "brokers"
    "enable.auto.commit", "false"
    "auto.offset.reset", "earliest"
  • Operating system: Win7x64
  • Provide logs (with "debug" : "..." as necessary in configuration)
  • Provide broker log excerpts
  • Critical issue
@mhowlett
Copy link
Contributor

mhowlett commented Feb 6, 2018

what you describe, as I read it, should work so I think i'm miss-understanding.

I assume your topic has more than one partition?

please provide a complete minimal program that demonstrates the problem and I'll take a look at that.

@KarolisKaj
Copy link
Author

KarolisKaj commented Feb 7, 2018

Hi, our topic has a single partition replicated through 3 brokers.
Our issue here is load balancing if we assign manually we consume same messages on different consumers with same consumer group id & since I do not get any events I cannot perform any actions when additional consumer is added.

How to use the code?
Run created .exe on your machine let it reach EOF. Then run one more .exe and see how it behaves with Assign & how it behaves with Subscribe. Change BrokerList to your brokers.

Here is code that does mentioned behavior:
In Main method:

            Task.Run(() => Run_Consume());
            Console.ReadLine();

Creation of config:

        private static Dictionary<string, object> ConstructConfig(string brokerList, bool enableAutoCommit)
        {
            return new Dictionary<string, object>
            {
             { "group.id", "advanced-csharp-consumer" },
             { "enable.auto.commit", "false" },
             { "auto.commit.interval.ms", 5000 },
             { "statistics.interval.ms", 60000 },
             { "bootstrap.servers", brokerList },
             {
                    "default.topic.config", new Dictionary<string, object>()
                 {
                     { "auto.offset.reset", "smallest" }
                 }
             }
            };
        }

Consumer:

        public static void Run_Consume(string brokerList = "brokers", string topic = "DeleteMePlease")
        {
            using (var consumer = new Consumer<byte[], byte[]>(ConstructConfig(brokerList, false), NullSerializer.Create(), NullSerializer.Create()))
            {
                // Uncomment Subscribe to see events being invoked. However with Assign uncommented these events are not being called.
                //consumer.Subscribe(new[] { topic });
                consumer.Assign(new List<TopicPartition> { new TopicPartition(topic, 0) });

                consumer.OnPartitionEOF += (_, end)
                    => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");

                consumer.OnError += (_, error)
                    => Console.WriteLine($"Error: {error}");

                consumer.OnConsumeError += (_, error)
                    => Console.WriteLine($"Consume error: {error}");
                // This is not called when Assign used above.
                consumer.OnPartitionsAssigned += (_, partitions) =>
                {
                    Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
                    consumer.Assign(partitions);
                };
                // This is not called when Assign used above.
                consumer.OnPartitionsRevoked += (_, partitions) =>
                {
                    Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
                    consumer.Unassign();
                };

                consumer.OnStatistics += (_, json)
                    => Console.WriteLine($"Statistics: {json}");


                Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");

                var cancelled = false;
                Console.CancelKeyPress += (_, e) =>
                {
                    e.Cancel = true; // prevent the process from terminating.
                    cancelled = true;
                };

                consumer.OnMessage += (o, msg) =>
                {
                };

                while (!cancelled)
                {
                    consumer.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
      }

@mhowlett
Copy link
Contributor

mhowlett commented Feb 7, 2018

messages in a single partition are never split between multiple consumers. each partition will be read by exactly one consumer and each consumer will read from zero or more partitions.

@KarolisKaj
Copy link
Author

@mhowlett Thanks for your reply. What we experience right now is, that when given code is being executed with Subscribe, then balancing works, only one consumer gets the messages others are idling since there is no other partition. And this balancing is being done via help of events (OnPartitionsAssigned & OnPartitionsRevoked) being invoked. However, if we execute same code with assign in that case we cannot rebalance and both of the consumers are getting same messages even when they are on the same consumer.group.

@edenhill
Copy link
Contributor

edenhill commented Feb 8, 2018

Subscribe() joins the consumer group and balances available partitions across the joined members, the assignment for each member is then Assign():ed.

Assign() simply starts fetching the given partitions, it does not perform any correlation with other consumers.

@KarolisKaj
Copy link
Author

@edenhill Ok, thanks for elaboration. However is there a way to get these events invoked? Since we perform range consumption of topics so we start consuming from provided offset and it is only available via Assign.

@edenhill
Copy link
Contributor

edenhill commented Feb 8, 2018

You can do that by transforming the TopicPartition list passed to you in OnPartitionsAssigned into a TopicPartitionOffset list with your choice of start offset which you then pass to Assign().

This is for the underlying C client, but explains its use:
https://github.com/edenhill/librdkafka/wiki/Manually-setting-the-consumer-start-offset

@KarolisKaj
Copy link
Author

@edenhill Thanks a bunch. I done my digging in the code but c client wasn't an easy beast to understand. I'll take a look, thanks for your time.

@edenhill
Copy link
Contributor

edenhill commented Feb 8, 2018

This is a good start: https://docs.confluent.io/current/clients/consumer.html

@KarolisKaj
Copy link
Author

Just wanted to thank both of you @edenhill & @mhowlett. The issue I had was related to my incorrect understanding of how consumption works and what is the different between subscribtion & assign methods. I have now managed to make our kafka client work as expected!

@edenhill
Copy link
Contributor

Happy to hear you got it sorted out!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants