Skip to content

Commit

Permalink
Kafka - add Kafka Idempotent repository test
Browse files Browse the repository at this point in the history
Fixes #2626
  • Loading branch information
zbendhiba authored and ppalaga committed Jun 14, 2021
1 parent 040da37 commit 41098df
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 2 deletions.
34 changes: 34 additions & 0 deletions integration-tests/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-log</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-mock</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-direct</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-integration-tests-support-kafka</artifactId>
Expand All @@ -68,6 +76,19 @@
</dependency>

<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-direct-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-kafka-deployment</artifactId>
Expand All @@ -94,6 +115,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-mock-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,29 @@

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.inject.Named;
import javax.json.Json;
import javax.json.JsonObject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -51,6 +60,12 @@ public class CamelKafkaResource {
@Named("kafka-producer-properties")
Properties producerProperties;

@Inject
CamelContext context;

@Inject
ProducerTemplate producerTemplate;

@Path("/{topicName}")
@POST
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -83,4 +98,23 @@ public JsonObject get(@PathParam("topicName") String topicName) {
.build();
}
}

@Path("idempotent/{id}")
@PUT
@Consumes(MediaType.APPLICATION_JSON)
public Response addMessage(@PathParam("id") Integer id, String message) {
producerTemplate.sendBodyAndHeader("direct:idempotent", message, "id", id);
return Response.accepted().build();
}

@Path("idempotent")
@GET
@Produces(MediaType.APPLICATION_JSON)
public List<String> getIdempotentResults() {
final MockEndpoint mockEndpoint = context.getEndpoint("mock:idempotent-results", MockEndpoint.class);
return mockEndpoint.getReceivedExchanges().stream()
.map(Exchange::getMessage)
.map(m -> m.getBody(String.class))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,37 @@
*/
package org.apache.camel.quarkus.component.kafka;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Named;

import io.quarkus.arc.Unremovable;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
import org.eclipse.microprofile.config.inject.ConfigProperty;

public class CamelKafkaRoutes extends RouteBuilder {
@ConfigProperty(name = "camel.component.kafka.brokers")
String brokers;

@Produces
@ApplicationScoped
@Unremovable
@Named("kafkaIdempotentRepository")
KafkaIdempotentRepository kafkaIdempotentRepository() {
return new KafkaIdempotentRepository("idempotent-topic", brokers);
}

@Override
public void configure() throws Exception {
from("kafka:inbound?autoOffsetReset=earliest")
.to("log:kafka")
.to("kafka:outbound");

from("direct:idempotent")
.idempotentConsumer(header("id"))
.messageIdRepositoryRef("kafkaIdempotentRepository")
.to("mock:idempotent-results")
.end();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@
*/
package org.apache.camel.quarkus.component.kafka.it;

import java.util.List;
import java.util.UUID;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import io.restassured.path.json.JsonPath;
import org.apache.camel.quarkus.test.support.kafka.KafkaTestResource;
import org.junit.jupiter.api.Test;

import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

@QuarkusTest
@QuarkusTestResource(KafkaTestResource.class)
Expand All @@ -35,14 +39,14 @@ public class CamelKafkaTest {
void testKafkaBridge() {
String body = UUID.randomUUID().toString();

RestAssured.given()
given()
.contentType("text/plain")
.body(body)
.post("/kafka/inbound")
.then()
.statusCode(200);

JsonPath result = RestAssured.given()
JsonPath result = given()
.get("/kafka/outbound")
.then()
.statusCode(200)
Expand All @@ -53,4 +57,22 @@ void testKafkaBridge() {
assertThat(result.getString("topicName")).isEqualTo("outbound");
assertThat(result.getString("body")).isEqualTo(body);
}

@Test
void testIndempotent() {

for (int i = 0; i < 10; i++) {
int id = i % 5;
given()
.contentType(ContentType.JSON)
.body("Test message")
.put("/kafka/idempotent/" + id)
.then()
.statusCode(202);
}

List<String> body = RestAssured.get("/kafka/idempotent").then().extract().body().as(List.class);
assertEquals(5, body.size());

}
}

0 comments on commit 41098df

Please sign in to comment.