Write clean and concise tests for your Kafka Streams application.
You can find a blog post on medium with some examples and detailed explanations of how Fluent Kafka Streams Tests work.
You can add Fluent Kafka Streams Tests via Maven Central.
compile group: 'com.bakdata.fluent-kafka-streams-tests', name: 'fluent-kafka-streams-tests-junit5', version: '2.1.0'
<dependency>
<groupId>com.bakdata.fluent-kafka-streams-tests</groupId>
<artifactId>fluent-kafka-streams-tests-junit5</artifactId>
<version>2.1.0</version>
</dependency>
There is also a junit4 version and one without any dependencies to a specific testing framework.
For other build tools or versions, refer to the overview of sonatype.
Here are two example tests which show you how to use Fluent Kafka Streams Tests.
Assume you have a Word Count Kafka Streams application, called WordCount
, and want to test it correctly.
First, start by creating a new test class with your application.
class WordCountTest {
private final WordCount app = new WordCount();
}
Then, set up the TestTopology
.
class WordCountTest {
private final WordCount app = new WordCount();
@RegisterExtension
final TestTopologyExtension<Object, String> testTopology =
new TestTopologyExtension<>(this.app::getTopology, this.app.getKafkaProperties());
}
The TestTopology
takes care of all the inputs, processing, and outputs of you application.
For it to do that, you need to register it as an extension (JUnit5), so certain setup/teardown methods are called.
The constructor expects a topology factory (for a fresh topology in each test) that creates the topology under test.
Additionally, the properties of the KafkaClient
need to be specified.
Broker and application-id must be present (Kafka testutil limitation), but are ignored.
Most importantly, if the application expects default serde for key and value, these must be present in the properties or
explicitly specified with withDefaultKeySerde(Serde serde)
and/or withDefaultValueSerde(Serde serde)
.
To test your appliction, you can simply write a JUnit test.
class WordCountTest {
private final WordCount app = new WordCount();
@RegisterExtension
final TestTopologyExtension<Object, String> testTopology =
new TestTopologyExtension<>(this.app::getTopology, this.app.getKafkaProperties());
@Test
void shouldAggregateSameWordStream() {
this.testTopology.input()
.add("cat")
.add("dog")
.add("cat");
this.testTopology.streamOutput().withSerde(Serdes.String(), Serdes.Long())
.expectNextRecord().hasKey("cat").hasValue(1L)
.expectNextRecord().hasKey("dog").hasValue(1L)
.expectNextRecord().hasKey("cat").hasValue(2L)
.expectNoMoreRecord();
}
}
See the tests for the junit4 and framework agnostic setup.
The TestTopology
has a method .input()
to retrieve the input topic (or .input(String topic)
) if more than one input topic is present).
You can simply add values to your input stream by calling .add(V value)
or .add(K key, V value)
.
To get the output, TestTopology
provides two methods: .streamOutput()
and .tableOutput()
.
They behave just like the input with regard to the number of output topics.
Using the stream version simulates Kafka's stream-semantics, meaning that a key can be present many times in an output stream, whereas the table-semantics only output the newest value of each key.
To check the output records, you can call .expectNextRecord()
and then chain .hasKey(K key)
, .hasKeySatisfying(Consumer<K> requirements)
, .hasValue(V value)
or .hasValueSatisfying(Consumer<V> requirements)
to this call.
Note that calling .expectNextRecord()
by itself without chaining at least one of the .has*
methods will not check for the existence of a next record!
Once you expect no further records, call .expectNoMoreRecord()
to indicate the end of the output stream.
We intentionally kept the API for output checking slim, because there are many tools out there which focus on doing exactly that.
The TestOutput
class implements the Iterable
interface, so you can use your favorite tool to test iterables.
Here is an example using AssertJ.
@Test
void shouldReturnCorrectIteratorTable() {
this.testTopology.input()
.add("cat")
.add("dog")
.add("bird");
assertThat(this.testTopology.tableOutput().withSerde(Serdes.String(), Serdes.Long()))
.extracting(ProducerRecord::key)
.containsAll(List.of("cat", "dog", "bird"));
}
There is also an API to consume a record's key or value in order to embed another assertion framework into our API.
@Test
void shouldReturnCorrectIteratorTable() {
this.testTopology.input()
.add("cat");
this.testTopology.streamOutput().withSerde(Serdes.String(), Serdes.Long())
.expectNextRecord()
.hasKeySatisfying(key -> assertThat(key).isEqualTo("cat"))
.hasValueSatisfying(value -> assertThat(value).isEqualTo(1L))
.expectNoMoreRecord();
}
Alternatively, you can convert the output to List
for use with your assertion framework. Here is an example of this with AssertJ.
@Test
void shouldConvertStreamOutputToList(){
this.testTopology.input()
.add("cat")
.add("dog")
.add("bird");
final List<ProducerRecord<String, Long>>outputs = this.testTopology.streamOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();
assertThat(outputs)
.extracting(ProducerRecord::key)
.containsExactly("cat", "dog", "bird");
assertThat(outputs)
.extracting(ProducerRecord::value)
.containsExactly(1L, 1L, 1L);
}
You can find many more tests in this repository's test code .
If you want to contribute to this project, you can simply clone the repository and build it via Gradle. All dependencies should be included in the Gradle files, there are no external prerequisites.
> git clone git@github.com:bakdata/fluent-kafka-streams-tests.git
> cd fluent-kafka-streams-tests && ./gradlew build
Please note, that we have code styles for Java. They are basically the Google style guide, with some small modifications.
We are happy if you want to contribute to this project. If you find any bugs or have suggestions for improvements, please open an issue. We are also happy to accept your PRs. Just open an issue beforehand and let us know what you want to do and why.
This project is licensed under the MIT license. Have a look at the LICENSE for more details.