The enclosed applications are demonstrations of various Spring for Apache Kafka approaches.
They are each Spring Boot applications, initially created by Spring Initializr (https://start.spring.io).
Each application shares the following properties in application.yml
:
spring:
kafka:
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
The first disables auto offet commits by the kafka client on a schedule. Instead, we rely on the Spring for Apache Kafka listener container to commit the offsets after records are passed to the listeners.
The second tells kafka that a new consumer group will start consuming from the start of the topic - this avoids a race condition where the sample apps might send messages before the listener container is started. The default for this property is to start consuming from the latest offset.
The samples assume Kafka is running on localhost
, on the default port 9092
.
Each application has an ApplicationRunner
@Bean
- this is for demonstration purposes only; it takes data typed in stdin
and sends it to Kafka.
Spring Boot’s autoconfiguration is used to create the infrastructure beans needed for these applications (ProducerFactory
, ConsumerFactory
, listener container factory, etc).
This sample actually comprises 3 evolutions of the same application.
The first version (package v1
) is truly the simplest with a single @KafkaListener
method which simply prints the message to stdout
.
Things to note:
-
the annotation (as well as the topic listened to) has an attribute 'groupId' - this is used for the kafka
group.id
property to identify the consumer group; it overrides thegroup.id
property configured by the consumer factory. This was added in version 1.3 to simplify configuration if you wish to use multiple groups in the same application. In version 2.0 and later, theid
property can be used instead of thegroupId
. -
the
NewTopic
@Bean
- version 1.3 added aKafkaAdmin
and boot auto-configures it. This is used to provision the topic in the broker (if not present); notice that it configures the topic to use 10 partitions. The number of partitions for an existing topic will be increased if necessary to match the bean configuration; partitions are not reduced, however.
This adds request/reply messaging.
The messages are sent to a second listener which upcases the String and sends it on to the first topic.
The KafkaTemplate
does not support blocking send and receive methods; it also currently does not have async send and receive.
Until recently, Kafka had no notion of message headers so there was no way to add replyTo
semantics to outgoing messages.
This has been resolved in the 0.11
kafka-clients
library and we expect to add async request/reply to the template at some later date.
(Update: The ReplyingKafkaTemplate
was added in version 2.1.3; it is not demonstrated here).
On the message listener, the @SendTo
annotation can be used to send the output to some arbitrary topic, and that is what is used in this example.
Note
|
The use of @SendTo requires a KafkaTemplate to be wired into the message listener container factory; this is now automatically done by Spring Boot.
|
This version implements ConsumerSeekAware
which enables to the listener to seek to specific offsets, either during initialization, or at runtime.
Care must be taken to only perform seek operations on the consumer thread, since the kafka consumer provided by the kafka-clients
library is not thread safe.
Note: starting with version 2.0, the Consumer<?, ?>
object can be provided as a parameter to the listener method and you can perform seeks directly there.
This sample provides an alternative to using @KafkaListener
; it uses Spring Integration instead.
The sample receives a message from kafka, performs some filtering/transformation on it and sends the message to another topic.
Of course, the kafka channel adapters can be used in any integration scenario, such as receiving a file from FTP and writing its lines to a kafka topic (or vice-versa).
A @KafkaListener
method is included to print out the messages sent by the integration flow.
This sample now includes some features from the upcoming 2.2 release; specifically:
-
Creating of the adapter’s container from the
ConcurrentKafkaListenerContainerFactory
- previously, that factory could only be used for@KafkaListener
methods. -
The
SeekToCurrentErrorHandler
can now have a recover when delivery retries are exhausted and this sample shows the use of aDeadLetterPublishingRecoverer
. It has a listener for the dead-letter topic.
This sample introduces a third option; it is based on the Spring Cloud Stream project (which uses Spring Integration internally). It also adds retry and publishing bad messages to a dead-letter topic.
spring:
cloud:
stream:
bindings:
input:
destination: rjug.dest
group: rjug.stream
consumer:
concurrency: 2
content-type: text/plain
output:
destination: rjug.dest.out
kafka:
bindings:
input:
consumer:
enable-dlq: true
Refer to the Spring Cloud Stream reference manual for information about these properties, but noice the enable-dlq
property.
When true failed messages are sent to a topic with name error.<dest>.<group>
- in this case error.rjug.dest.rjug.stream
.
When sending a message fail
to stdin, the listener throws an exception and the message is routed to the dead-letter topic.
To monitor the output of this application, start console consumers in a terminal window:
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic rjug.dest.out --from-beginning
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic error.rjug.dest.rjug.stream --from-beginning
Recent updates to this sample include @KafkaListener
s for these topics.