Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.extensions.kafka.eventhandling.util;

import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

/**
* Test utility for generating a {@link ProducerConfig}.
*
* @author Nakul Mishra
* @author Steven van Beelen
*/
public abstract class ProducerConfigUtil {

private ProducerConfigUtil() {
// Utility class
}

/**
* Minimal configuration required for creating a {@link KafkaProducer}.
* <ul>
* <li><code>key.serializer</code> - {@link StringSerializer}.</li>
* <li><code>value.serializer</code> - {@link StringSerializer}.</li>
* </ul>
*
* @param bootstrapServer the Kafka Container address
* @return the configuration.
*/
public static KafkaProducer<String, CloudEvent> newProducer(String bootstrapServer) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configs.put(ProducerConfig.RETRIES_CONFIG, 10);
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
configs.put(ProducerConfig.LINGER_MS_CONFIG, 1);
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
return new KafkaProducer<>(configs);}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.extensions.kafka.integration;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventBuilder;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.axonframework.config.Configurer;
import org.axonframework.config.EventProcessingConfiguration;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.ResetHandler;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.EnableMBeanExport;
import org.springframework.jmx.support.RegistrationPolicy;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.redpanda.RedpandaContainer;

import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static org.awaitility.Awaitility.await;
import static org.axonframework.extensions.kafka.eventhandling.util.ProducerConfigUtil.newProducer;
import static org.junit.jupiter.api.Assertions.*;

@Testcontainers
class TokenReplayIntegrationTest {

@Container
private static final RedpandaContainer REDPANDA_CONTAINER = new RedpandaContainer(
"docker.redpanda.com/vectorized/redpanda:v22.2.1");
private ApplicationContextRunner testApplicationContext;


@BeforeEach
void setUp() {
testApplicationContext = new ApplicationContextRunner()
.withPropertyValues("axon.axonserver.enabled=false")
.withPropertyValues("axon.kafka.fetcher.enabled=true")
.withPropertyValues("axon.kafka.publisher.enabled=false")
.withPropertyValues("axon.kafka.message-converter-mode=cloud_event")
.withPropertyValues("axon.kafka.consumer.event-processor-mode=tracking")
.withPropertyValues("axon.kafka.consumer.bootstrap-servers=" + REDPANDA_CONTAINER.getBootstrapServers())
.withUserConfiguration(DefaultContext.class);
}

@Test
void afterResetShouldOnlyProcessTenEventsIfTimeSetMidway() {
testApplicationContext
.withPropertyValues("axon.kafka.default-topic=counterfeed-1")
.run(context -> {
Counter counter = context.getBean(Counter.class);
assertNotNull(counter);
assertEquals(0, counter.getCount());
Instant between = addRecords("counterfeed-1");
await().atMost(Duration.ofSeconds(5L)).untilAsserted(
() -> assertEquals(20, counter.getCount())
);
EventProcessingConfiguration processingConfiguration = context.getBean(EventProcessingConfiguration.class);
assertNotNull(processingConfiguration);
processingConfiguration
.eventProcessorByProcessingGroup(
"counterfeedprocessor",
TrackingEventProcessor.class
)
.ifPresent(tep -> {
tep.shutDown();
tep.resetTokens(tep.getMessageSource().createTokenAt(between));
assertEquals(0, counter.getCount());
tep.start();
});
await().atMost(Duration.ofSeconds(5L)).untilAsserted(
() -> assertEquals(10, counter.getCount())
);
});
}

@Test
void afterResetShouldOnlyProcessNewMessages() {
testApplicationContext
.withPropertyValues("axon.kafka.default-topic=counterfeed-2")
.run(context -> {
Counter counter = context.getBean(Counter.class);
assertNotNull(counter);
assertEquals(0, counter.getCount());
addRecords("counterfeed-2");
await().atMost(Duration.ofSeconds(5L)).untilAsserted(
() -> assertEquals(20, counter.getCount())
);
EventProcessingConfiguration processingConfiguration = context.getBean(EventProcessingConfiguration.class);
assertNotNull(processingConfiguration);
processingConfiguration
.eventProcessorByProcessingGroup(
"counterfeedprocessor",
TrackingEventProcessor.class
)
.ifPresent(tep -> {
tep.shutDown();
tep.resetTokens(tep.getMessageSource().createHeadToken());
assertEquals(0, counter.getCount());
tep.start();
});
addRecords("counterfeed-2");
await().atMost(Duration.ofSeconds(5L)).untilAsserted(
() -> assertEquals(20, counter.getCount())
);
});
}

private Instant addRecords(String topic) {
Producer<String, CloudEvent> producer = newProducer(REDPANDA_CONTAINER.getBootstrapServers());
sendTenMessages(producer, topic);
Instant now = Instant.now();
sendTenMessages(producer, topic);
producer.close();
return now;
}

private void sendMessage(Producer<String, CloudEvent> producer, String topic) {
CloudEvent event = new CloudEventBuilder()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("source"))
.withData("Payload".getBytes())
.withType("java.util.String")
.build();
ProducerRecord<String, CloudEvent> record = new ProducerRecord<>(topic, 0, null, null, event);
producer.send(record);
}

private void sendTenMessages(Producer<String, CloudEvent> producer, String topic) {
IntStream.range(0, 10).forEach(i -> sendMessage(producer, topic));
producer.flush();
}

@ContextConfiguration
@EnableAutoConfiguration
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
public static class DefaultContext {

@Bean
Counter counter() {
return new Counter();
}

@Bean
KafkaEventHandler kafkaEventHandler(Counter counter) {
return new KafkaEventHandler(counter);
}

@Autowired
public void registerProcessor(
Configurer configurer,
StreamableKafkaMessageSource<?, ?> streamableKafkaMessageSource
) {
configurer.eventProcessing()
.registerTrackingEventProcessor("counterfeedprocessor", c -> streamableKafkaMessageSource);
}
}

private static class Counter {

private final AtomicInteger counter = new AtomicInteger();

int getCount() {
return counter.get();
}

void count() {
counter.incrementAndGet();
}

void reset() {
counter.set(0);
}
}

@SuppressWarnings("unused")
@Component
@ProcessingGroup("counterfeedprocessor")
private static class KafkaEventHandler {

private final Counter counter;

private KafkaEventHandler(Counter counter) {
this.counter = counter;
}

@EventHandler
void on(EventMessage<?> eventMessage) {
counter.count();
}

@ResetHandler
void onReset() {
counter.reset();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,18 +54,11 @@ private ConsumerSeekUtil() {
*/
public static void seekToCurrentPositions(Consumer<?, ?> consumer, Supplier<KafkaTrackingToken> tokenSupplier,
List<String> topics) {
List<TopicPartition> all = consumer.listTopics().entrySet()
.stream()
.filter(e -> topics.contains(e.getKey()))
.flatMap(e -> e.getValue().stream())
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
.collect(Collectors.toList());
List<TopicPartition> all = topicPartitions(consumer, topics);
consumer.assign(all);
KafkaTrackingToken currentToken = tokenSupplier.get();
Map<TopicPartition, Long> tokenPartitionPositions = currentToken.getPositions();
all.forEach(assignedPartition -> {
Map<TopicPartition, Long> tokenPartitionPositions = currentToken.getPositions();

long offset = 0L;
if (tokenPartitionPositions.containsKey(assignedPartition)) {
offset = tokenPartitionPositions.get(assignedPartition) + 1;
Expand All @@ -75,4 +68,21 @@ public static void seekToCurrentPositions(Consumer<?, ?> consumer, Supplier<Kafk
consumer.seek(assignedPartition, offset);
});
}

/**
* Get all the {@link TopicPartition topicPartitions} belonging to the given {@code topics}.
*
* @param consumer a Kafka {@link Consumer}
* @param topics a list with topics
* @return a list of all the {@link TopicPartition topicPartitions}
*/
public static List<TopicPartition> topicPartitions(Consumer<?, ?> consumer, List<String> topics) {
return consumer.listTopics().entrySet()
.stream()
.filter(e -> topics.contains(e.getKey()))
.flatMap(e -> e.getValue().stream())
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
.collect(Collectors.toList());
}
}
Loading