Skip to content

Commit

Permalink
Fix MockEndpoint usage in Infinispan tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesnetherton committed Apr 26, 2022
1 parent f73ffac commit 3b0b94c
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
Expand Down Expand Up @@ -82,13 +85,25 @@ public JsonObject inspectCamelInfinispanClientConfiguration() {
}

@Path("/aggregate")
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void aggregate(@QueryParam("component") String component, String content) {
String uri = component.equals("infinispan") ? "direct:camelAggregation" : "direct:quarkusAggregation";
Map<String, Object> headers = getCommonHeaders(component);
headers.put(CORRELATOR_HEADER, CORRELATOR_HEADER);
template.sendBodyAndHeaders(uri, content, headers);
@GET
public void aggregate(@QueryParam("component") String component) throws InterruptedException {
String mockEndpointUri = component.equals("infinispan") ? "mock:camelAggregationResult"
: "mock:quarkusAggregationResult";
MockEndpoint mockEndpoint = camelContext.getEndpoint(mockEndpointUri, MockEndpoint.class);
mockEndpoint.expectedMessageCount(2);
mockEndpoint.expectedBodiesReceived(1 + 3 + 4 + 5, 6 + 7 + 20 + 21);

try {
String uri = component.equals("infinispan") ? "direct:camelAggregation" : "direct:quarkusAggregation";
Map<String, Object> headers = getCommonHeaders(component);
headers.put(CORRELATOR_HEADER, CORRELATOR_HEADER);

Stream.of(1, 3, 4, 5, 6, 7, 20, 21).forEach(value -> template.sendBodyAndHeaders(uri, value, headers));

mockEndpoint.assertIsSatisfied(15000);
} finally {
mockEndpoint.reset();
}
}

@Path("/clear")
Expand Down Expand Up @@ -141,6 +156,27 @@ public Boolean containsValue(@QueryParam("component") String component, @QueryPa
return template.requestBodyAndHeaders("direct:containsValue", value, headers, Boolean.class);
}

@Path("/event/verify")
@GET
public void listener(
@QueryParam("component") String component,
@QueryParam("mockEndpointUri") String mockEndpointUri,
String content) throws InterruptedException {
MockEndpoint mockEndpoint = camelContext.getEndpoint(mockEndpointUri, MockEndpoint.class);
mockEndpoint.expectedMessageCount(1);
mockEndpoint.message(0).header(InfinispanConstants.EVENT_TYPE).isEqualTo("CLIENT_CACHE_ENTRY_CREATED");
mockEndpoint.message(0).header(InfinispanConstants.CACHE_NAME).isNotNull();
mockEndpoint.message(0).header(InfinispanConstants.KEY).isEqualTo("the-key");

try {
Map<String, Object> headers = getCommonHeaders(component);
template.sendBodyAndHeaders("direct:put", content, headers);
mockEndpoint.assertIsSatisfied(5000);
} finally {
mockEndpoint.reset();
}
}

@Path("/get")
@GET
@Produces(MediaType.TEXT_PLAIN)
Expand Down Expand Up @@ -198,16 +234,25 @@ public void putAllAsync(@QueryParam("component") String component)
}

@Path("/putIdempotent")
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void putIdempotent(
@QueryParam("component") String component,
@QueryParam("messageId") String messageId,
String content) {
@GET
public void putIdempotent(@QueryParam("component") String component) throws InterruptedException {
String mockEndpointUri = component.equals("infinispan") ? "mock:camelResultIdempotent" : "mock:quarkusResultIdempotent";
MockEndpoint mockEndpoint = camelContext.getEndpoint(mockEndpointUri, MockEndpoint.class);
mockEndpoint.expectedMessageCount(1);

String messageId = UUID.randomUUID().toString();
String uri = component.equals("infinispan") ? "direct:camelIdempotent" : "direct:quarkusIdempotent";
Map<String, Object> headers = getCommonHeaders(component);
headers.put("MessageId", messageId);
template.sendBodyAndHeaders(uri, content, headers);
try {
IntStream.of(1, 10).forEach(value -> {
Map<String, Object> headers = getCommonHeaders(component);
headers.put("MessageId", messageId);
template.sendBodyAndHeaders(uri, "Message " + value, headers);
});

mockEndpoint.assertIsSatisfied(5000);
} finally {
mockEndpoint.reset();
}
}

@Path("/putIfAbsent")
Expand Down Expand Up @@ -305,48 +350,6 @@ public Integer stats(@QueryParam("component") String component) {
return statistics.getIntStatistic(ServerStatistics.CURRENT_NR_OF_ENTRIES);
}

@Path("/mock/aggregation/results")
@GET
public void assertMockEndpointAggregationResults(@QueryParam("uri") String uri) throws InterruptedException {
MockEndpoint mockEndpoint = camelContext.getEndpoint(uri, MockEndpoint.class);
mockEndpoint.expectedMessageCount(2);
mockEndpoint.expectedBodiesReceived(1 + 3 + 4 + 5, 6 + 7 + 20 + 21);

try {
mockEndpoint.assertIsSatisfied(5000);
} finally {
mockEndpoint.reset();
}
}

@Path("/mock/event/results")
@GET
public void assertMockEndpointEventResults(@QueryParam("uri") String uri) throws InterruptedException {
MockEndpoint mockEndpoint = camelContext.getEndpoint(uri, MockEndpoint.class);
mockEndpoint.expectedMessageCount(1);
mockEndpoint.message(0).header(InfinispanConstants.EVENT_TYPE).isEqualTo("CLIENT_CACHE_ENTRY_CREATED");
mockEndpoint.message(0).header(InfinispanConstants.CACHE_NAME).isNotNull();
mockEndpoint.message(0).header(InfinispanConstants.KEY).isEqualTo("test-key");

try {
mockEndpoint.assertIsSatisfied(5000);
} finally {
mockEndpoint.reset();
}
}

@Path("/mock/idempotent/results")
@GET
public void assertMockEndpointIdempotentResults(@QueryParam("uri") String uri) throws InterruptedException {
MockEndpoint mockEndpoint = camelContext.getEndpoint(uri, MockEndpoint.class);
mockEndpoint.expectedMessageCount(1);
try {
mockEndpoint.assertIsSatisfied(5000);
} finally {
mockEndpoint.reset();
}
}

@POST
@Path("consumer/{routeId}/{enabled}")
public void manageRoute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void configure() {
from("infinispan:camel?eventTypes=CLIENT_CACHE_ENTRY_CREATED")
.id("infinispan-events")
.autoStartup(false)
.to("mock:resultCreated");
.to("mock:camelResultCreated");

// Only start aggregation repository routes in JVM mode
if (!"executable".equals(System.getProperty("org.graalvm.nativeimage.kind"))) {
Expand All @@ -173,38 +173,38 @@ public void configure() {
.aggregationRepository(createAggregationRepository("infinispan"))
.aggregationStrategy(createAggregationStrategy())
.completionSize(COMPLETION_SIZE)
.to("mock:aggregationResult");
.to("mock:camelAggregationResult");

from("direct:quarkusAggregation")
.aggregate(header(CORRELATOR_HEADER))
.aggregationRepository(createAggregationRepository("infinispan-quarkus"))
.aggregationStrategy(createAggregationStrategy())
.completionSize(COMPLETION_SIZE)
.to("mock:aggregationResult");
.to("mock:quarkusAggregationResult");
}

from("direct:camelIdempotent")
.idempotentConsumer(header("MessageID"), createIdempotentRepository("infinispan"))
.to("mock:resultIdempotent");
.to("mock:camelResultIdempotent");

from("direct:quarkusIdempotent")
.idempotentConsumer(header("MessageID"), createIdempotentRepository("infinispan-quarkus"))
.to("mock:resultIdempotent");
.to("mock:quarkusResultIdempotent");

from("infinispan-quarkus:quarkus?eventTypes=CLIENT_CACHE_ENTRY_CREATED")
.id("infinispan-quarkus-events")
.autoStartup(false)
.to("mock:resultCreated");
.to("mock:quarkusResultCreated");

from("infinispan:camel?customListener=#customListener")
.id("infinispan-custom-listener")
.autoStartup(false)
.to("mock:resultCustomListener");
.to("mock:camelResultCustomListener");

from("infinispan-quarkus:quarkus?customListener=#customListener")
.id("infinispan-quarkus-custom-listener")
.autoStartup(false)
.to("mock:resultCustomListener");
.to("mock:quarkusResultCustomListener");
}

@Named("infinispan-quarkus")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.camel.quarkus.component.infinispan;

import java.util.UUID;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import io.quarkus.test.common.QuarkusTestResource;
Expand Down Expand Up @@ -68,19 +66,9 @@ public void aggregate(String componentName) {
// the default configured ProtoStreamMarshaller, thus we avoid doing it in this test suite
Assumptions.assumeTrue(componentName.equals("infinispan"));

Stream.of(1, 3, 4, 5, 6, 7, 20, 21)
.forEach(value -> {
RestAssured.with()
.queryParam("component", componentName)
.body(value)
.post("/infinispan/aggregate")
.then()
.statusCode(204);
});

RestAssured.with()
.queryParam("uri", "mock:aggregationResult")
.get("/infinispan/mock/aggregation/results")
.queryParam("component", componentName)
.get("/infinispan/aggregate")
.then()
.statusCode(204);
}
Expand Down Expand Up @@ -203,16 +191,13 @@ public void customListener(String componentName) {
.statusCode(204);

try {
String mockEndpointUri = componentName.equals("infinispan") ? "mock:camelResultCustomListener"
: "mock:quarkusResultCustomListener";
RestAssured.with()
.queryParam("component", componentName)
.queryParam("mockEndpointUri", mockEndpointUri)
.body("Hello " + componentName)
.post("/infinispan/put")
.then()
.statusCode(204);

RestAssured.with()
.queryParam("uri", "mock:resultCustomListener")
.get("/infinispan/mock/event/results")
.get("/infinispan/event/verify")
.then()
.statusCode(204);
} finally {
Expand All @@ -234,16 +219,13 @@ public void events(String componentName) {
.statusCode(204);

try {
String mockEndpointUri = componentName.equals("infinispan") ? "mock:camelResultCreated"
: "mock:quarkusResultCreated";
RestAssured.with()
.queryParam("component", componentName)
.queryParam("mockEndpointUri", mockEndpointUri)
.body("Hello " + componentName)
.post("/infinispan/put")
.then()
.statusCode(204);

RestAssured.with()
.queryParam("uri", "mock:resultCreated")
.get("/infinispan/mock/event/results")
.get("/infinispan/event/verify")
.then()
.statusCode(204);
} finally {
Expand Down Expand Up @@ -283,21 +265,9 @@ public void getOrDefault(String componentName) {
@ParameterizedTest
@MethodSource("componentNames")
public void idempotent(String componentName) {
String messageId = UUID.randomUUID().toString();

IntStream.of(1, 10).forEach(value -> {
RestAssured.with()
.queryParam("component", componentName)
.queryParam("messageId", messageId)
.body("Message " + value)
.post("/infinispan/putIdempotent")
.then()
.statusCode(204);
});

RestAssured.with()
.queryParam("uri", "mock:resultIdempotent")
.get("/infinispan/mock/idempotent/results")
.queryParam("component", componentName)
.get("/infinispan/putIdempotent")
.then()
.statusCode(204);
}
Expand Down

0 comments on commit 3b0b94c

Please sign in to comment.