Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer throughput. Usage of KafkaMessageBusOptions.DefaultConsumerThreadCount #265

Closed
jojopeter opened this issue Sep 16, 2021 · 3 comments

Comments

@jojopeter
Copy link

Hi,
This is a query on Consumer throughput of Kafka when I use Foundatio my application.

We subscribe to the messages in following manner.

namespace : Foundatio.Messaging
class : static class MessageBusExtensions
Method:
public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func<T, Task> handler, CancellationToken cancellationToken = default) where T : class {
return subscriber.SubscribeAsync((msg, token) => handler(msg), cancellationToken);
}

We are using above method and the implementation is as follows

      messageBus.SubscribeAsync<message>(
       m => this.HandleMessageAsync(m)).GetAwaiter().GetResult();
      var options = new KafkaMessageBusOptions
      {
        DefaultConsumerThreadCount = 4,
        ManualCommitBatch = 10,
        ProducerConfig = new ProducerConfig
        {
          BootstrapServers = bootstrapServers,
          Acks = Acks.Leader,
          Partitioner = Partitioner.ConsistentRandom,
          SecurityProtocol = securityProtocol,
          Debug = "security"
        },
        ConsumerConfig = new ConsumerConfig
        {
          GroupId = consumerGroupId,
          BootstrapServers = bootstrapServers,
          AutoOffsetReset = AutoOffsetReset.Latest,
          EnableAutoCommit = true,
          AutoCommitIntervalMs = 5000,
          CancellationDelayMaxMs = 1000,
          SessionTimeoutMs = 100000,
          SecurityProtocol = securityProtocol,
          Debug = "security"
        }
      };
      services.AddFoundatioKafkaMessageBus(options);

We noticed the consumer throughput is very low and trying to optimize it. As part of this I changed the DefaultConsumerThreadCount from 4 to 8. But that didn't make any difference to the consumer throughput.
Can you please let us know what is the intention of 'DefaultConsumerThreadCount'? Is there any other setting I can change to increate the consumer throughput?

Thanks In advance,
Jojo Peter

@niemyjski
Copy link
Member

We don't have a Kafka message bus implementation that we ship with. What NuGet package are you using? We also try not to do any GetAwaiter().GetResults() in an async code path.

@jojopeter
Copy link
Author

Thanks for your reply.
Sorry I posted in the repository.
BTW, I am using 'Aix.KafkaMessageBus' package.

Thanks.

@niemyjski
Copy link
Member

No problem, I'm going to close this as we don't own that code. I'd recommend posting on their project repository. If you have any feedback on the core libraries we would love to hear it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants