Skip to content

Commit

Permalink
Add integration-tests-support-kafka module
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesnetherton committed Apr 23, 2021
1 parent 2574836 commit e39f071
Show file tree
Hide file tree
Showing 19 changed files with 299 additions and 240 deletions.
76 changes: 76 additions & 0 deletions integration-tests-support/kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-integration-tests-support</artifactId>
<version>1.9.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>camel-quarkus-integration-tests-support-kafka</artifactId>
<name>Camel Quarkus :: Integration Tests :: Support :: Kafka</name>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-bom-test</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-integration-test-support</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.jboss.jandex</groupId>
<artifactId>jandex-maven-plugin</artifactId>
<executions>
<execution>
<id>make-index</id>
<goals>
<goal>jandex</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,48 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka;
package org.apache.camel.quarkus.test.support.kafka;

import java.util.Collections;
import java.util.Properties;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Named;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public final class CamelKafkaSupport {
private CamelKafkaSupport() {
}
public class KafkaProducers {

public static KafkaConsumer<Integer, String> createConsumer(String topicName) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("camel.component.kafka.brokers"));
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
@ApplicationScoped
@Named("kafka-consumer-properties")
public Properties kafkaConsumerProperties() {
Properties props = createBaseConfiguration();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));

return consumer;
return props;
}

public static Producer<Integer, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("camel.component.kafka.brokers"));
props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-consumer");
@ApplicationScoped
@Named("kafka-producer-properties")
public Properties kafkaProducerProperties() {
Properties props = createBaseConfiguration();
props.put(ProducerConfig.CLIENT_ID_CONFIG, "camel-quarkus-kafka-client");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return props;
}

return new KafkaProducer<>(props);
private Properties createBaseConfiguration() {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, KafkaTestSupport.getBootstrapServers());
props.put(CommonClientConfigs.GROUP_ID_CONFIG, "camel-quarkus-group");
return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka.it;
package org.apache.camel.quarkus.test.support.kafka;

import java.util.Collections;
import java.util.Map;
Expand All @@ -27,9 +27,10 @@
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.TestcontainersConfiguration;

public class CamelKafkaTestResource implements QuarkusTestResourceLifecycleManager {
private static final Logger LOGGER = LoggerFactory.getLogger(CamelKafkaTestResource.class);
private static final String CONFLUENT_PLATFORM_VERSION = "5.4.3";
public class KafkaTestResource implements QuarkusTestResourceLifecycleManager {

protected static final DockerImageName KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka").withTag("5.4.3");
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestResource.class);

private KafkaContainer container;

Expand All @@ -38,8 +39,7 @@ public Map<String, String> start() {
LOGGER.info(TestcontainersConfiguration.getInstance().toString());

try {
DockerImageName imageName = DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION);
container = new KafkaContainer(imageName)
container = new KafkaContainer(KAFKA_IMAGE_NAME)
/* Added container startup logging because of https://github.com/apache/camel-quarkus/issues/2461 */
.withLogConsumer(frame -> System.out.print(frame.getUtf8String()))
.withEmbeddedZookeeper()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.test.support.kafka;

import java.util.Optional;
import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;

public final class KafkaTestSupport {

public static String getBootstrapServers() {
return getKafkaConfigValue(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
}

public static String getKafkaConfigValue(String key) {
Config config = ConfigProvider.getConfig();
Optional<String> optional = config.getOptionalValue(key, String.class);

if (!optional.isPresent()) {
optional = config.getOptionalValue("kafka." + key, String.class);
}

if (!optional.isPresent() && key.equals(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) {
optional = config.getOptionalValue("camel.component.kafka.brokers", String.class);
}

if (!optional.isPresent()) {
throw new IllegalStateException("Property " + key + " has not been set");
}

return optional.get();
}

public static void setKafkaConfigProperty(Properties props, String key) {
props.put(key, getKafkaConfigValue(key));
}

public static void setKafkaConfigFromProperty(Properties props, String key, String valueKey) {
props.put(key, getKafkaConfigValue(valueKey));
}
}
1 change: 1 addition & 0 deletions integration-tests-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<module>custom-routes-collector</module>
<module>custom-type-converter</module>
<module>custom-main-listener</module>
<module>kafka</module>
<module>process-executor-support</module>
<module>test-support</module>
<module>mock-backend</module>
Expand Down
9 changes: 4 additions & 5 deletions integration-tests/kafka-sasl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-log</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-integration-tests-support-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
Expand All @@ -61,11 +65,6 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-integration-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
package org.apache.camel.quarkus.kafka.sasl;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.inject.Named;
import javax.json.Json;
import javax.json.JsonObject;
import javax.ws.rs.GET;
Expand All @@ -28,21 +32,38 @@
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.apache.camel.quarkus.test.support.kafka.KafkaTestSupport;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;

@Path("/test")
@Path("/kafka-sasl")
@ApplicationScoped
public class KafkaSaslResource {

@Path("/kafka/{topicName}")
@Inject
@Named("kafka-consumer-properties")
Properties consumerProperties;

@Inject
@Named("kafka-producer-properties")
Properties producerProperties;

@Path("/{topicName}")
@POST
@Produces(MediaType.APPLICATION_JSON)
public JsonObject post(@PathParam("topicName") String topicName, String message) throws Exception {
try (Producer<Integer, String> producer = KafkaSupport.createProducer()) {
Properties props = (Properties) producerProperties.clone();
KafkaTestSupport.setKafkaConfigProperty(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
KafkaTestSupport.setKafkaConfigProperty(props, SaslConfigs.SASL_MECHANISM);
KafkaTestSupport.setKafkaConfigProperty(props, SaslConfigs.SASL_JAAS_CONFIG);

try (Producer<Integer, String> producer = new KafkaProducer<>(props)) {
RecordMetadata meta = producer.send(new ProducerRecord<>(topicName, 1, message)).get();

return Json.createObjectBuilder()
Expand All @@ -53,11 +74,18 @@ public JsonObject post(@PathParam("topicName") String topicName, String message)
}
}

@Path("/kafka/{topicName}")
@Path("/{topicName}")
@GET
@Produces(MediaType.APPLICATION_JSON)
public JsonObject get(@PathParam("topicName") String topicName) {
try (KafkaConsumer<Integer, String> consumer = KafkaSupport.createConsumer(topicName)) {
Properties props = (Properties) consumerProperties.clone();
KafkaTestSupport.setKafkaConfigProperty(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
KafkaTestSupport.setKafkaConfigProperty(props, SaslConfigs.SASL_MECHANISM);
KafkaTestSupport.setKafkaConfigProperty(props, SaslConfigs.SASL_JAAS_CONFIG);

try (KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topicName));

ConsumerRecord<Integer, String> record = consumer.poll(Duration.ofSeconds(60)).iterator().next();
return Json.createObjectBuilder()
.add("topicName", record.topic())
Expand Down

0 comments on commit e39f071

Please sign in to comment.