Mockafka is a DSL which allows testing of kafka-streams topologies without need of a kafka and zookeeper installation. It is a Java 8 port of jpzk/mockedstreams.
Add it to your pom.xml
...
<dependency>
<groupId>com.github.carlosmenezes</groupId>
<artifactId>mockafka</artifactId>
<version>0.1.2</version>
<scope>test</scope>
</dependency>
or to your build.gradle
.
testCompile "com.github.carlosmenezes:mockafka:0.1.2"
List<Message<Integer, Integer>> input = Stream.of(1, 2, 3, 4, 5, 6, 7)
.map(i -> new Message<>(i, i))
.collect(Collectors.toList());
Serde<Integer> integerSerde = Serdes.Integer();
List<Message<Integer, Integer>> output = Mockafka
.builder()
.topology(builder ->
builder.stream(integerSerde, integerSerde, "numbersTopic")
.filter((key, value) -> value % 2 == 1)
.to(integerSerde, integerSerde, "oddNumbersTopic")
)
.input("numbersTopic", integerSerde, integerSerde, input)
.output("oddNumbersTopic", integerSerde, integerSerde, 4);
List<Message<Integer, Integer>> expected = Arrays.asList(new Message<>(1, 1), new Message<>(3, 3), new Message<>(5, 5), new Message<>(7, 7));
assertEquals(4, output.size());
assertEquals(expected, output);
MockafkaBuilder builder = Mockafka
.builder()
.topology(builder -> {...})
.input("someInput", Serdes.String(), Serdes.String(), someInput)
.input("anotherInput", Serdes.String(), Serdes.String(), anotherInput);
List<Message<String, String>> someOutput = builder.output("someOutput", Serdes.String(), Serdes.String(), 10);
List<Message<String, String>> anotherOutput = builder.output("anotherOutput", Serdes.String(), Serdes.String(), 10);
You can create state stores using .stores(String... stores)
method and verify
it's contents with .stateTable(String name)
method:
MockafkaBuilder builder = Mockafka
.builder()
.topology(builder -> {...})
.input("someInput", Serdes.String(), Serdes.String(), someInput)
.stores("someStore");
Map<String, String> someStore = builder.stateTable("someStore");
assertEquals(10, someStore.size());
When defining a timestamp extractor in .config(Properties config)
you can verify the content as
windowed state stores using the method .windowStateTable(String name, K key, long timeFrom, long timeTo)
.
Properties properties = new Properties();
properties.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class.getName());
MockafkaBuilder builder = Mockafka
.builder()
.topology(TopologyUtil::windowStateTopology)
.input(TopologyUtil.WINDOW_TOPIC, TopologyUtil.stringSerde, TopologyUtil.integerSerde, createInputKeyValueForWindow().toArray(new Message[]{}))
.stores(TEST_STORE)
.config(properties);
Map<String, Long> actualSomeKey = builder.windowStateTable(TEST_STORE, "somekey", 0, Long.MAX_VALUE);
Map<Long, Long> expectedSomeKey = new HashMap<>();
expectedSomeKey.put(25L, 1L);
expectedSomeKey.put(42L, 2L);
assertEquals(expectedSomeKey, actualSomeKey);
Map<String, Long> actualAnotherKey = builder.windowStateTable(TEST_STORE, "anotherkey", 0, Long.MAX_VALUE);
Map<Long, Long> expectedAnotherKey = new HashMap<>();
expectedAnotherKey.put(50L, 2L);
expectedAnotherKey.put(90L, 1L);
assertEquals(expectedAnotherKey, actualAnotherKey);
If you need some custom configuration, just use the method .config(Properties config)
:
Properties properties = new Properties();
properties.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class.getName());
MockafkaBuilder builder = Mockafka
.builder()
.topology({...})
.input({...})
.stores({...})
.config(properties);