diff --git a/src/main/java/com/bakdata/common_kafka_streams/KafkaStreamsApplication.java b/src/main/java/com/bakdata/common_kafka_streams/KafkaStreamsApplication.java index 3af75efd2..f4a09a2b7 100644 --- a/src/main/java/com/bakdata/common_kafka_streams/KafkaStreamsApplication.java +++ b/src/main/java/com/bakdata/common_kafka_streams/KafkaStreamsApplication.java @@ -85,12 +85,15 @@ public abstract class KafkaStreamsApplication implements Runnable, AutoCloseable @CommandLine.Option(names = "--streams-config", split = ",", description = "Additional Kafka Streams properties") private Map streamsConfig = new HashMap<>(); - @CommandLine.Option(names = "--input-topic", description = "Input topic") - protected String inputTopic = ""; + @CommandLine.Option(names = "--input-topics", description = "Input topics", split = ",") + protected List inputTopics = new ArrayList<>(); @CommandLine.Option(names = "--output-topic", description = "Output topic") protected String outputTopic = ""; + @CommandLine.Option(names = "--error-topic", description = "Error topic (default: ${DEFAULT-VALUE}") + protected String errorTopic = "error_topic"; + private KafkaStreams streams; private static String[] addEnvironmentVariablesArguments(final String[] args) { @@ -226,9 +229,9 @@ protected static void runResetter(final String inputTopics, final String brokers } protected void cleanUp() { - if (!this.inputTopic.isBlank()) { - runResetter(this.inputTopic, this.brokers, this.getUniqueAppId()); - } + this.inputTopics.stream() + .filter(topic -> !topic.isBlank()) + .forEach(topic -> runResetter(topic, this.brokers, this.getUniqueAppId())); this.streams.cleanUp(); try { Thread.sleep(RESET_SLEEP_MS); diff --git a/src/test/java/com/bakdata/common_kafka_streams/WordCountTest.java b/src/test/java/com/bakdata/common_kafka_streams/WordCountTest.java index ead808850..466c19613 100644 --- a/src/test/java/com/bakdata/common_kafka_streams/WordCountTest.java +++ b/src/test/java/com/bakdata/common_kafka_streams/WordCountTest.java @@ -34,7 +34,7 @@ import picocli.CommandLine; class WordCountTest { - private static final String[] ARGS = {"--input-topic", "Input", "--output-topic", "Output", + private static final String[] ARGS = {"--input-topics", "Input,Input2", "--output-topic", "Output", "--brokers", "localhost:9092", "--schema-registry-url", "registryUrl", "--streams-config", "test.ack=1,test1.ack=2"}; private final WordCount app = CommandLine.populateCommand(new WordCount(), ARGS); @@ -62,4 +62,9 @@ void shouldSetKafkaProperties() { assertThat(this.app.getKafkaProperties().getProperty("test1.ack")).isEqualTo("2"); } + @Test + void shouldParseMultipleInputTopics() { + assertThat(this.app.getInputTopics()) + .containsExactly("Input", "Input2"); + } } diff --git a/src/test/java/com/bakdata/common_kafka_streams/integration/ReprocessingTest.java b/src/test/java/com/bakdata/common_kafka_streams/integration/ReprocessingTest.java index 00403ee82..3bcd6fcf4 100644 --- a/src/test/java/com/bakdata/common_kafka_streams/integration/ReprocessingTest.java +++ b/src/test/java/com/bakdata/common_kafka_streams/integration/ReprocessingTest.java @@ -60,7 +60,7 @@ void setup() { this.mirror = new Mirror(); this.mirror.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()); final String inputTopicName = "input"; - this.mirror.setInputTopic(inputTopicName); + this.mirror.setInputTopics(List.of(inputTopicName)); final String outputTopicName = "output"; this.mirror.setOutputTopic(outputTopicName); this.mirror.setBrokers(this.kafkaCluster.getBrokerList()); @@ -70,7 +70,7 @@ void setup() { "default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde")); this.kafkaCluster.createTopic(TopicConfig.forTopic(this.mirror.getOutputTopic()).useDefaults()); - this.kafkaCluster.createTopic(TopicConfig.forTopic(this.mirror.getInputTopic()).useDefaults()); + this.kafkaCluster.createTopic(TopicConfig.forTopic(this.mirror.getInputTopics().get(0)).useDefaults()); } @AfterEach @@ -88,7 +88,7 @@ void shouldReprocessOnFirstRun() { @Test void shouldReprocessAlreadySeenRecords() throws InterruptedException { final SendValuesTransactional sendRequest = - SendValuesTransactional.inTransaction(this.mirror.getInputTopic(), + SendValuesTransactional.inTransaction(this.mirror.getInputTopics().get(0), Arrays.asList("a", "b", "c")).useDefaults(); this.kafkaCluster.send(sendRequest); diff --git a/src/test/java/com/bakdata/common_kafka_streams/test_applications/Mirror.java b/src/test/java/com/bakdata/common_kafka_streams/test_applications/Mirror.java index 07d9504a2..bbc223bf9 100644 --- a/src/test/java/com/bakdata/common_kafka_streams/test_applications/Mirror.java +++ b/src/test/java/com/bakdata/common_kafka_streams/test_applications/Mirror.java @@ -33,12 +33,13 @@ public class Mirror extends KafkaStreamsApplication { @Override public void buildTopology(final StreamsBuilder builder) { - final KStream input = builder.stream(this.getInputTopic()); + final KStream input = builder.stream(this.getInputTopics().get(0)); input.to(this.getOutputTopic()); } @Override public String getUniqueAppId() { - return this.getClass().getSimpleName() + "-" + this.getInputTopic() + "-" + this.getOutputTopic(); + return this.getClass().getSimpleName() + "-" + String.join("-", this.getInputTopics()) + + "-" + this.getOutputTopic(); } } diff --git a/src/test/java/com/bakdata/common_kafka_streams/test_applications/WordCount.java b/src/test/java/com/bakdata/common_kafka_streams/test_applications/WordCount.java index 351c3b3c1..5e64dc75d 100644 --- a/src/test/java/com/bakdata/common_kafka_streams/test_applications/WordCount.java +++ b/src/test/java/com/bakdata/common_kafka_streams/test_applications/WordCount.java @@ -48,7 +48,7 @@ public void buildTopology(final StreamsBuilder builder) { final Serde stringSerde = Serdes.String(); final Serde longSerde = Serdes.Long(); - final KStream textLines = builder.stream(this.inputTopic); + final KStream textLines = builder.stream(this.inputTopics.get(0)); final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); final KTable wordCounts = textLines @@ -61,7 +61,8 @@ public void buildTopology(final StreamsBuilder builder) { @Override public String getUniqueAppId() { - return this.getClass().getSimpleName() + "-" + this.getInputTopic() + "-" + this.getOutputTopic(); + return this.getClass().getSimpleName() + "-" + String.join("-", this.getInputTopics()) + + "-" + this.getOutputTopic(); } public Properties getKafkaProperties() {