Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,15 @@ public abstract class KafkaStreamsApplication implements Runnable, AutoCloseable
@CommandLine.Option(names = "--streams-config", split = ",", description = "Additional Kafka Streams properties")
private Map<String, String> streamsConfig = new HashMap<>();

@CommandLine.Option(names = "--input-topic", description = "Input topic")
protected String inputTopic = "";
@CommandLine.Option(names = "--input-topics", description = "Input topics", split = ",")
protected List<String> inputTopics = new ArrayList<>();

@CommandLine.Option(names = "--output-topic", description = "Output topic")
protected String outputTopic = "";

@CommandLine.Option(names = "--error-topic", description = "Error topic (default: ${DEFAULT-VALUE}")
protected String errorTopic = "error_topic";

private KafkaStreams streams;

private static String[] addEnvironmentVariablesArguments(final String[] args) {
Expand Down Expand Up @@ -226,9 +229,9 @@ protected static void runResetter(final String inputTopics, final String brokers
}

protected void cleanUp() {
if (!this.inputTopic.isBlank()) {
runResetter(this.inputTopic, this.brokers, this.getUniqueAppId());
}
this.inputTopics.stream()
.filter(topic -> !topic.isBlank())
.forEach(topic -> runResetter(topic, this.brokers, this.getUniqueAppId()));
this.streams.cleanUp();
try {
Thread.sleep(RESET_SLEEP_MS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import picocli.CommandLine;

class WordCountTest {
private static final String[] ARGS = {"--input-topic", "Input", "--output-topic", "Output",
private static final String[] ARGS = {"--input-topics", "Input,Input2", "--output-topic", "Output",
"--brokers", "localhost:9092", "--schema-registry-url", "registryUrl", "--streams-config",
"test.ack=1,test1.ack=2"};
private final WordCount app = CommandLine.populateCommand(new WordCount(), ARGS);
Expand Down Expand Up @@ -62,4 +62,9 @@ void shouldSetKafkaProperties() {
assertThat(this.app.getKafkaProperties().getProperty("test1.ack")).isEqualTo("2");
}

@Test
void shouldParseMultipleInputTopics() {
assertThat(this.app.getInputTopics())
.containsExactly("Input", "Input2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void setup() {
this.mirror = new Mirror();
this.mirror.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl());
final String inputTopicName = "input";
this.mirror.setInputTopic(inputTopicName);
this.mirror.setInputTopics(List.of(inputTopicName));
final String outputTopicName = "output";
this.mirror.setOutputTopic(outputTopicName);
this.mirror.setBrokers(this.kafkaCluster.getBrokerList());
Expand All @@ -70,7 +70,7 @@ void setup() {
"default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde"));

this.kafkaCluster.createTopic(TopicConfig.forTopic(this.mirror.getOutputTopic()).useDefaults());
this.kafkaCluster.createTopic(TopicConfig.forTopic(this.mirror.getInputTopic()).useDefaults());
this.kafkaCluster.createTopic(TopicConfig.forTopic(this.mirror.getInputTopics().get(0)).useDefaults());
}

@AfterEach
Expand All @@ -88,7 +88,7 @@ void shouldReprocessOnFirstRun() {
@Test
void shouldReprocessAlreadySeenRecords() throws InterruptedException {
final SendValuesTransactional<String> sendRequest =
SendValuesTransactional.inTransaction(this.mirror.getInputTopic(),
SendValuesTransactional.inTransaction(this.mirror.getInputTopics().get(0),
Arrays.asList("a", "b", "c")).useDefaults();
this.kafkaCluster.send(sendRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
public class Mirror extends KafkaStreamsApplication {
@Override
public void buildTopology(final StreamsBuilder builder) {
final KStream<String, String> input = builder.stream(this.getInputTopic());
final KStream<String, String> input = builder.stream(this.getInputTopics().get(0));
input.to(this.getOutputTopic());
}

@Override
public String getUniqueAppId() {
return this.getClass().getSimpleName() + "-" + this.getInputTopic() + "-" + this.getOutputTopic();
return this.getClass().getSimpleName() + "-" + String.join("-", this.getInputTopics()) +
"-" + this.getOutputTopic();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void buildTopology(final StreamsBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

final KStream<String, String> textLines = builder.stream(this.inputTopic);
final KStream<String, String> textLines = builder.stream(this.inputTopics.get(0));

final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
final KTable<String, Long> wordCounts = textLines
Expand All @@ -61,7 +61,8 @@ public void buildTopology(final StreamsBuilder builder) {

@Override
public String getUniqueAppId() {
return this.getClass().getSimpleName() + "-" + this.getInputTopic() + "-" + this.getOutputTopic();
return this.getClass().getSimpleName() + "-" + String.join("-", this.getInputTopics()) +
"-" + this.getOutputTopic();
}

public Properties getKafkaProperties() {
Expand Down