Kafka is quite different from traditional messaging systems such as RabbitMQ!
If you're new to Kafka, check out the following resources for more information about how consumption works in Kafka:
- The original blog post for the modern Java consumer - the .net consumer works similarly (though not exactly).
- A more recent blog post covering the consumer in detail
- Confluent docs
Consumer typically work as part of a group. At any given time, one and only one consumer in a group will be assigned to read from each partition of a subscribed to topic (assuming the group is not currently rebalancing). You use the
Subscribemethod to join a group.
You aren't required to use consumer groups. You can directly assign to specific partitions using the
Assignmethod (and never call the
You must specify a
GroupIdin your configuration. You must do this even if you don't utilize any group functionality. We will fix this at some point.
It's fine to call
Consume()with a timeout of 0 and this is a non blocking operation - you'll get a message if one is available,
AutoOffsetResetproperty dictates the behavior when a committed offset is not available for a partition or is invalid. If your consumer is not getting messages, try setting this to
AutoOffsetReset.Earliestand see if that helps (common misunderstanding!).
High Level Architecture
The Consumer API sits at a much high level of abstraction than the Kafka protocol, which is used to communicate with the cluster.
When you call
Consume, you are pulling messages from an local in-memory queue - you are not directly sending requests to the cluster. Behind the scenes, the client orchestrates connecting to the required brokers, automatically correcting in the case of leader changes etc.
Although your application consumes messages one by one, messages are pulled by the client from brokers in batches for efficiency. By default, caching of messages on the client is very aggressive (optimized for high throughput). You may want to reduce this in scenarios. The configuration property you want is:
There are two types of error in Kafka - retryable and non-retryable. Generally, the client will automatically recover from any re-tryable error without the application developer needing to do anything or even being informed.
Generally errors exposed via the error callback or in the log represent re-tryable errors and are for informational purposes only.
However, if an error has the
IsFatal flag marked as
true (should generally never happen), the consumer is in an unrecoverable state and must be re-created.
The term 'Assign' is used for more than one purpose unfortunately. There are two related concepts:
- The assignment given to a particular consumer by the consumer group.
- The partitions assigned to be read from by the consumer.
With the Java client, these are always the same. The .NET Client is more flexible - it allows you to override the assignment given to you by the group to be whatever you want. This is an advanced, uncommonly used feature. It might be useful for example to add a control message partition to be read by every consumer in addition to the assigned partitions.
The docs for the .NET client on the confluent website go into a fair bit of information about how to commit offsets:
TLDR: you should probably be using
true (the default)
Question: I want to synchronously commit offsets after each consumed message. It's very slow. How do I make it fast?
librdkafka uses the same broker connection both for commits and fetching messages, thus a commit may be backed up behind a long-poll blocking Fetch request. The long-poll behavior of fetch requests is configured with the
fetch.wait.max.ms property that defaults to 100ms. You can decrease that value to decrease offset commit latency at the expense of a larger number of Fetch requests (depending on your traffic pattern). Also see https://github.com/edenhill/librdkafka/issues/1787
todo: is this information still current?
At-most once, at-least once and exactly once semantics
It's possible to write your consume loop such that your application is guaranteed to process each message at-most once, or at-least once. It's also possible to write code that provides neither guarantee - which is what you get by default! you probably want to change this.
When v1.4 is released, you'll also be able to write stream processing applications (kafka -> kafka) with exactly-once semantics.
Default configuration behavior
EnableAutoOffsetStore are both set to true.
With auto commit enabled, the consumer automatically commits offsets to Kafka periodically in a background thread if they have been marked as ready to store. With auto store offset enabled, offsets are marked ready to store immediately prior to a message being delivered to the application via
With this setup:
your application may fail to process a message if it crashes between when the
Consumemethod stores the message offset as ready for commit, and when your application has finished processing the message. This will occur if additionally before the application crashes, a commit protocol request is successfully sent to the cluster by the consumer in the background.
your application may process messages more than once. This may occur because offsets are only committed periodically to the cluster - it's possible for your application to process a number of messages and crash before the corresponding background auto commit occurs.
For more information, refer to the Confluent docs.
The consumer automatically sends 'heartbeat' messages to the group controller every
HeartbeatIntervalMs in a background thread. If the controller does not receive a heartbeat message within
SessionTimeoutMs it assumes the
Consumer has died, and kicks it out of the consumer group. If the consumer has not in fact died, it will automatically ask to rejoin the group when connectivity is re-established.
MaxPollIntervalMs protects against the situation where the application processing logic may have become unresponsive, but the consumer's background thread is still diligently sending heartbeats to the group coordinator, so the consumer remains in the group. You must call
Consume with a period not greater than
MaxPollIntervalMs otherwise or the consumer will be kicked out of the group.
Why is there no
We haven't implemented these yet (and we want to take the time to do so properly). We do want to add them because they allow you to write more idiomatic C# code.
If you are trying to set up a
HostedService, instead consider setting up a dedicated background thread (tied to app lifetime) and do a standard sync consume loop in that. this is completely fine - just not idiomatic C# (everything is async these days). this approach will actually be measurably more performant than an async approach because that comes with a fair bit of overhead (compared to the # msgs / s you can get out of the kafka consumer!)
Alternatively you could fake an async consume method using
await Task.Run(() => cosumer.Consumer(timeout)). That has more overhead than approach #1, but will allow you to use the standard hosted service pattern (you'll still get 100's of thousands of messages a second out of it).