Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue 260 by adding a topic function. #264

Merged
merged 5 commits into from Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 10 additions & 5 deletions .github/workflows/main.yml
Expand Up @@ -33,10 +33,19 @@ jobs:
server-username: MAVEN_USERNAME
server-password: MAVEN_PASSWORD

- name: Regular build
- name: Run regular build
run: |
./mvnw -B -U clean verify

- name: Run integration tests
run: | # no clean
./mvnw -B -U -Pintegration-test -DskipExamples

- name: Build coverage report
if: matrix.sonar-enabled
run: | # no clean
./mvnw -B -U -Pcoverage-aggregate -DskipExamples

- name: Sonar analysis
if: matrix.sonar-enabled
run: | # no clean
Expand All @@ -48,10 +57,6 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: Run integration tests
run: | # no clean
./mvnw -B -Pitest

- name: Deploy to Sonatype
if: success()
run: | # no clean, no tests, no examples
Expand Down
20 changes: 19 additions & 1 deletion .github/workflows/pullrequest.yml
Expand Up @@ -11,7 +11,9 @@ jobs:
matrix:
include:
- java-version: 8
sonar-enabled: false
- java-version: 11
sonar-enabled: true
fail-fast: false # run both to the end

steps:
Expand All @@ -34,4 +36,20 @@ jobs:

- name: Run integration tests
run: | # no clean
./mvnw -B -Pitest
./mvnw -B -U -Pintegration-test -DskipExamples

- name: Build coverage report
if: matrix.sonar-enabled
run: | # no clean
./mvnw -B -U -Pcoverage-aggregate -DskipExamples

- name: Sonar Analysis
if: ${{ success() && matrix.sonar-enabled && github.event.pull_request.head.repo.full_name == github.repository }}
run: |
./mvnw -B sonar:sonar \
-Dsonar.projectKey=AxonFramework_extension-kafka \
-Dsonar.organization=axonframework \
-Dsonar.host.url=https://sonarcloud.io \
-Dsonar.login=${{ secrets.SONAR_TOKEN }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
17 changes: 8 additions & 9 deletions CONTRIBUTING.md
Expand Up @@ -45,16 +45,15 @@ profiles are used.
For a **regular** build, execute from your command line: `./mvnw`. This operation will run the build and execute JUnit tests
of all modules and package the resulting artifacts.

This repository contains an example project.
You can skip its build by adding `-DskipExamples` to your build command.
This repository contains an example project. You can skip its build by adding `-DskipExamples` to your build command.

There are long-running integration tests present (starting Spring Boot Application and/or running Kafka in a TestContainer), which **ARE NOT** executed by default.
A unique `itest` build is needed to run those long-running tests.
If you want to run them, please call `./mvnw -Pitest` from your command line.
When introducing additional integration tests, make sure the class name ends with `IntegrationTest`.
There are long-running integration tests present (starting Spring Boot Application and/or running Kafka in a
TestContainer), which **ARE NOT** executed by default. A unique `integration-test` build is needed to run those
long-running tests. If you want to run them, please call `./mvnw -Pintegration-test` from your command line. When
introducing additional integration tests, make sure the class name ends with `IntegrationTest`.

The project uses JaCoCo to measure test coverage of the code and automatically generate coverage reports on regular
and `itest` builds. If you are interested in the overall test coverage, please run `./mvnw -Pcoverage-aggregate`
(without calling `clean`) after you run the **regular** and `itest` builds and check the resulting aggregated report
in `./coverage-report-generator/target/site/jacoco-aggregate/index.html`
and `integration-test` builds. If you are interested in the overall test coverage, please
run `./mvnw -Pcoverage clean verify`
and check the resulting aggregated report in `./coverage-report/target/site/jacoco-aggregate/index.html`

Expand Up @@ -24,7 +24,7 @@
<version>4.6.0-SNAPSHOT</version>
</parent>

<artifactId>axon-kafka-coverage-report-generator</artifactId>
<artifactId>axon-kafka-coverage-report</artifactId>
<version>4.6.0-SNAPSHOT</version>

<name>Axon Framework Kafka Extension - Coverage Report Generator</name>
Expand All @@ -40,11 +40,13 @@
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka-spring-boot-autoconfigure</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>

Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.Optional;
gklijs marked this conversation as resolved.
Show resolved Hide resolved
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -70,7 +71,7 @@ public class KafkaPublisher<K, V> {
private final ProducerFactory<K, V> producerFactory;
private final KafkaMessageConverter<K, V> messageConverter;
private final MessageMonitor<? super EventMessage<?>> messageMonitor;
private final String topic;
private final TopicResolver topicResolver;
private final long publisherAckTimeout;

/**
Expand All @@ -86,7 +87,7 @@ protected KafkaPublisher(Builder<K, V> builder) {
this.producerFactory = builder.producerFactory;
this.messageConverter = builder.messageConverter;
this.messageMonitor = builder.messageMonitor;
this.topic = builder.topic;
this.topicResolver = builder.topicResolver;
this.publisherAckTimeout = builder.publisherAckTimeout;
}

Expand Down Expand Up @@ -123,37 +124,42 @@ public static <K, V> Builder<K, V> builder() {
* @param event the events to publish on the Kafka broker.
* @param <T> the implementation of {@link EventMessage} send through this method
*/
@SuppressWarnings("squid:S2095") //producer needs to be closed async, not within this method
public <T extends EventMessage<?>> void send(T event) {
logger.debug("Starting event producing process for [{}].", event.getPayloadType());
Optional<String> topic = topicResolver.apply(event);
gklijs marked this conversation as resolved.
Show resolved Hide resolved
if (!topic.isPresent()) {
logger.debug("Skip publishing event for [{}] since topicFunction returned empty.", event.getPayloadType());
return;
}
UnitOfWork<?> uow = CurrentUnitOfWork.get();

MonitorCallback monitorCallback = messageMonitor.onMessageIngested(event);
try (Producer<K, V> producer = producerFactory.createProducer()) {
ConfirmationMode confirmationMode = producerFactory.confirmationMode();
Producer<K, V> producer = producerFactory.createProducer();
ConfirmationMode confirmationMode = producerFactory.confirmationMode();

if (confirmationMode.isTransactional()) {
tryBeginTxn(producer);
}

// Sends event messages to Kafka and receive a future indicating the status.
Future<RecordMetadata> publishStatus = producer.send(messageConverter.createKafkaMessage(event, topic.get()));

uow.onPrepareCommit(u -> {
if (confirmationMode.isTransactional()) {
tryBeginTxn(producer);
tryCommit(producer, monitorCallback);
} else if (confirmationMode.isWaitForAck()) {
waitForPublishAck(publishStatus, monitorCallback);
}
tryClose(producer);
});

// Sends event messages to Kafka and receive a future indicating the status.
Future<RecordMetadata> publishStatus = producer.send(messageConverter.createKafkaMessage(event, topic));

uow.onPrepareCommit(u -> {
if (confirmationMode.isTransactional()) {
tryCommit(producer, monitorCallback);
} else if (confirmationMode.isWaitForAck()) {
waitForPublishAck(publishStatus, monitorCallback);
}
tryClose(producer);
});

uow.onRollback(u -> {
if (confirmationMode.isTransactional()) {
tryRollback(producer);
}
tryClose(producer);
});
}
uow.onRollback(u -> {
if (confirmationMode.isTransactional()) {
tryRollback(producer);
}
tryClose(producer);
});
}

private void tryBeginTxn(Producer<?, ?> producer) {
Expand Down Expand Up @@ -248,7 +254,7 @@ public static class Builder<K, V> {
.serializer(XStreamSerializer.defaultSerializer())
.build();
private MessageMonitor<? super EventMessage<?>> messageMonitor = NoOpMessageMonitor.instance();
private String topic = DEFAULT_TOPIC;
private TopicResolver topicResolver = m -> Optional.of(DEFAULT_TOPIC);
private long publisherAckTimeout = 1_000;

/**
Expand Down Expand Up @@ -296,14 +302,30 @@ public Builder<K, V> messageMonitor(MessageMonitor<? super EventMessage<?>> mess
}

/**
* Set the Kafka {@code topic} to publish {@link EventMessage}s on. Defaults to {@code Axon.Events}.
* Set the Kafka {@code topic} to publish {@link EventMessage}s on. Defaults to {@code Axon.Events}. Should not
* be used together with setting the topicResolver.
gklijs marked this conversation as resolved.
Show resolved Hide resolved
*
* @param topic the Kafka {@code topic} to publish {@link EventMessage}s on
* @return the current Builder instance, for fluent interfacing
*/
public Builder<K, V> topic(String topic) {
assertThat(topic, name -> Objects.nonNull(name) && !"".equals(name), "The topic may not be null or empty");
this.topic = topic;
this.topicResolver = m -> Optional.of(topic);
return this;
}

/**
* Set the resolver to determine the Kafka {@code topic} to publish a certain {@link EventMessage} to. When the
* resolver returns {@code Optional.empty()} will not publish the {@link EventMessage}. Defaults to always
gklijs marked this conversation as resolved.
Show resolved Hide resolved
* return the set topic, or always return {@code Axon.Events}. Should not be used together with setting the
* topic.
*
* @param topicResolver the Kafka {@code topic} to publish {@link EventMessage}s on
* @return the current Builder instance, for fluent interfacing
*/
public Builder<K, V> topicResolver(TopicResolver topicResolver) {
assertNonNull(topicResolver, "TopicFunction may not be null");
gklijs marked this conversation as resolved.
Show resolved Hide resolved
this.topicResolver = topicResolver;
return this;
}

Expand Down
@@ -0,0 +1,14 @@
package org.axonframework.extensions.kafka.eventhandling.producer;
smcvb marked this conversation as resolved.
Show resolved Hide resolved

import org.axonframework.eventhandling.EventMessage;

import java.util.Optional;
import java.util.function.Function;

/**
* Interface to determine if a message should be published to Kafka, and if so to which topic. If the result from the
gklijs marked this conversation as resolved.
Show resolved Hide resolved
* call is {@code Optional.empty()} is will not be published, else the result will be used for the topic.
*/
gklijs marked this conversation as resolved.
Show resolved Hide resolved
public interface TopicResolver extends Function<EventMessage<?>, Optional<String>> {
gklijs marked this conversation as resolved.
Show resolved Hide resolved

}
Expand Up @@ -38,6 +38,7 @@
import org.junit.jupiter.api.*;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.axonframework.eventhandling.GenericEventMessage.asEventMessage;
Expand Down Expand Up @@ -78,7 +79,13 @@ void setUp() {
producerFactory = ProducerConfigUtil.ackProducerFactory(getBootstrapServers(), ByteArraySerializer.class);
publisher = KafkaPublisher.<String, byte[]>builder()
.producerFactory(producerFactory)
.topic(TEST_TOPIC)
.topicResolver(m -> {
if (m.getPayloadType().isAssignableFrom(String.class)) {
return Optional.of(TEST_TOPIC);
} else {
return Optional.empty();
}
})
.build();
KafkaEventPublisher<String, byte[]> sender =
KafkaEventPublisher.<String, byte[]>builder().kafkaPublisher(publisher).build();
Expand Down Expand Up @@ -124,6 +131,34 @@ void testPublishAndReadMessages() throws Exception {
assertTrue(stream2.hasNextAvailable(25, TimeUnit.SECONDS));
TrackedEventMessage<?> actual = stream2.nextAvailable();
assertNotNull(actual);
assertEquals("test", actual.getPayload());

stream2.close();
}

@Test
void testSkipPublishForLongPayload() throws Exception {
StreamableKafkaMessageSource<String, byte[]> streamableMessageSource =
StreamableKafkaMessageSource.<String, byte[]>builder()
.topics(Collections.singletonList(TEST_TOPIC))
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.build();

BlockingStream<TrackedEventMessage<?>> stream1 = streamableMessageSource.openStream(null);
stream1.close();
BlockingStream<TrackedEventMessage<?>> stream2 = streamableMessageSource.openStream(null);

//This one will not be received
eventBus.publish(asEventMessage(42L));
//Added so we don't have to wait longer than necessary, to know the other one did not publish
eventBus.publish(asEventMessage("test"));

// The consumer may need some time to start
assertTrue(stream2.hasNextAvailable(25, TimeUnit.SECONDS));
TrackedEventMessage<?> actual = stream2.nextAvailable();
assertNotNull(actual);
assertInstanceOf(String.class, actual.getPayload(), "Long is not skipped");

stream2.close();
}
Expand Down