Skip to content

Commit

Permalink
Improve MicroProfile Fault Tolerance extension test coverage
Browse files Browse the repository at this point in the history
Fixes #3677
  • Loading branch information
jamesnetherton committed Mar 29, 2022
1 parent ef15c62 commit 4e1ff26
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 35 deletions.
44 changes: 39 additions & 5 deletions integration-tests/microprofile/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,31 @@
<dependencies>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-microprofile-fault-tolerance</artifactId>
<artifactId>camel-quarkus-bean</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-microprofile-health</artifactId>
<artifactId>camel-quarkus-direct</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-microprofile-metrics</artifactId>
<artifactId>camel-quarkus-log</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-direct</artifactId>
<artifactId>camel-quarkus-microprofile-fault-tolerance</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-log</artifactId>
<artifactId>camel-quarkus-microprofile-health</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-microprofile-metrics</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-mock</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand Down Expand Up @@ -112,6 +120,19 @@
</activation>
<dependencies>
<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-bean-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-direct-deployment</artifactId>
Expand Down Expand Up @@ -177,6 +198,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-mock</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
</profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.microprofile.it.faulttolerance;

import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.faulttolerance.CircuitBreaker;
import org.eclipse.microprofile.faulttolerance.Fallback;
import org.eclipse.microprofile.faulttolerance.Timeout;

import static org.apache.camel.quarkus.component.microprofile.it.faulttolerance.MicroProfileFaultToleranceRoutes.EXCEPTION_MESSAGE;
import static org.apache.camel.quarkus.component.microprofile.it.faulttolerance.MicroProfileFaultToleranceRoutes.FALLBACK_RESULT;
import static org.apache.camel.quarkus.component.microprofile.it.faulttolerance.MicroProfileFaultToleranceRoutes.RESULT;

@ApplicationScoped
public class GreetingBean {

@Fallback(fallbackMethod = "fallbackGreeting")
public String greetWithFallback() {
AtomicInteger counter = MicroProfileFaultToleranceHelper.getCounter("beanFallback");
if (counter.incrementAndGet() == 1) {
throw new IllegalStateException(EXCEPTION_MESSAGE);
}
return RESULT;
}

@Timeout(250)
public String greetWithDelay() throws InterruptedException {
AtomicInteger counter = MicroProfileFaultToleranceHelper.getCounter("beanTimeout");
if (counter.incrementAndGet() == 1) {
Thread.sleep(500);
return "Nothing to see here, method invocation timed out!";
}
return RESULT;
}

@CircuitBreaker(failureRatio = 1.0, requestVolumeThreshold = 1, delay = 0)
public String greetWithCircuitBreaker() {
AtomicInteger counter = MicroProfileFaultToleranceHelper.getCounter("beanThreshold");
if (counter.incrementAndGet() == 1) {
throw new IllegalStateException(EXCEPTION_MESSAGE);
}
return RESULT;
}

public String fallbackGreeting() {
return FALLBACK_RESULT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.microprofile.it.faulttolerance;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class MicroProfileFaultToleranceHelper {

private static final Map<String, AtomicInteger> COUNTERS = new ConcurrentHashMap();

private MicroProfileFaultToleranceHelper() {
// Utility class
}

public static AtomicInteger getCounter(String name) {
return COUNTERS.computeIfAbsent(name, key -> new AtomicInteger());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,47 @@
*/
package org.apache.camel.quarkus.component.microprofile.it.faulttolerance;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.inject.Named;

import org.apache.camel.builder.RouteBuilder;
import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;

@ApplicationScoped
public class MicroProfileFaultToleranceRoutes extends RouteBuilder {

public static final String EXCEPTION_MESSAGE = "Simulated Exception";
public static final String FALLBACK_RESULT = "Fallback response";
public static final String RESULT = "Hello Camel Quarkus MicroProfile Fault Tolerance";
private static final AtomicInteger COUNTER = new AtomicInteger();
private static final AtomicInteger TIMEOUT_COUNTER = new AtomicInteger();

@Inject
GreetingBean greetingBean;

@Override
public void configure() throws Exception {
from("direct:faultTolerance")
from("direct:faultToleranceWithBulkhead")
.circuitBreaker()
.faultToleranceConfiguration().bulkheadEnabled(true).end()
.process(exchange -> {
if (COUNTER.incrementAndGet() == 1) {
AtomicInteger counter = MicroProfileFaultToleranceHelper.getCounter("bulkhead");
if (counter.incrementAndGet() == 1) {
throw new IllegalStateException(EXCEPTION_MESSAGE);
}
exchange.getMessage().setBody(RESULT);
})
.onFallback()
.setBody().constant(FALLBACK_RESULT)
.end();

from("direct:faultToleranceWithFallback")
.circuitBreaker()
.process(exchange -> {
AtomicInteger counter = MicroProfileFaultToleranceHelper.getCounter("fallback");
if (counter.incrementAndGet() == 1) {
throw new IllegalStateException("Simulated Exception");
}
exchange.getMessage().setBody(RESULT);
Expand All @@ -41,18 +65,72 @@ public void configure() throws Exception {
.setBody().constant(FALLBACK_RESULT)
.end();

from("direct:faultToleranceWithThreshold")
.circuitBreaker()
.faultToleranceConfiguration().failureRatio(100).successThreshold(1).requestVolumeThreshold(1).end()
.process(exchange -> {
AtomicInteger counter = MicroProfileFaultToleranceHelper.getCounter("threshold");
if (counter.incrementAndGet() == 1) {
throw new IllegalStateException("Simulated Exception");
}
exchange.getMessage().setBody("Nothing to see here. Circuit breaker is open...");
})
.end()
.setBody().simple(RESULT);

from("direct:faultToleranceWithTimeout")
.circuitBreaker()
.faultToleranceConfiguration().timeoutEnabled(true).timeoutDuration(500).end()
.process(exchange -> {
if (TIMEOUT_COUNTER.incrementAndGet() == 1) {
AtomicInteger counter = MicroProfileFaultToleranceHelper.getCounter("timeout");
if (counter.incrementAndGet() == 1) {
Thread.sleep(1000);
}
exchange.getMessage().setBody("Regular hi " + exchange.getMessage().getBody(String.class));
exchange.getMessage().setBody(RESULT);
})
.onFallback()
.setBody().simple("Sorry ${body}, had to fallback!")
.setBody().simple(FALLBACK_RESULT)
.end();

from("direct:faultToleranceWithTimeoutCustomExecutor")
.circuitBreaker()
.faultToleranceConfiguration().timeoutEnabled(true).timeoutScheduledExecutorService("myThreadPool")
.timeoutDuration(500).end()
.process(exchange -> {
AtomicInteger counter = MicroProfileFaultToleranceHelper.getCounter("timeoutCustomExecutor");
if (counter.incrementAndGet() == 1) {
Thread.sleep(1000);
}
exchange.getMessage().setBody(RESULT);
})
.onFallback()
.setBody().simple(FALLBACK_RESULT)
.end();

from("direct:inheritErrorHandler")
.errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(3).redeliveryDelay(0))
.circuitBreaker().inheritErrorHandler(true)
.to("mock:start")
.throwException(new IllegalArgumentException(EXCEPTION_MESSAGE)).end()
.to("mock:end");

from("direct:circuitBreakerBean")
.bean(greetingBean, "greetWithCircuitBreaker");

from("direct:fallbackBean")
.bean(greetingBean, "greetWithFallback");

from("direct:timeoutBean")
.doTry()
.bean(greetingBean, "greetWithDelay")
.doCatch(TimeoutException.class)
.setBody().constant(FALLBACK_RESULT)
.end();
}

@Named("myThreadPool")
public ScheduledExecutorService myThreadPool() {
return getCamelContext().getExecutorServiceManager()
.newScheduledThreadPool(this, "myThreadPool", 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,53 @@
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.mock.MockEndpoint;

@Path("/microprofile-fault-tolerance")
public class MicroprofileFaultToleranceResource {

@Inject
ProducerTemplate producerTemplate;

@Inject
CamelContext context;

@Path("/route/{route}")
@POST
@Produces(MediaType.TEXT_PLAIN)
public String triggerFaultToleranceRoute(String body, @PathParam("route") String route) {
return producerTemplate.requestBody("direct:" + route, body, String.class);
public String triggerFaultToleranceRoute(@PathParam("route") String route) {
return producerTemplate.requestBody("direct:" + route, null, String.class);
}

@Path("/faultToleranceWithThreshold/{route}")
@POST
@Produces(MediaType.TEXT_PLAIN)
public String faultToleranceWithThreshold(@PathParam("route") String route) {
try {
return producerTemplate.requestBody("direct:" + route, null, String.class);
} catch (Exception e) {
return e.getCause().getMessage();
}
}

@Path("/inheritErrorHandler")
@POST
public void inheritErrorHandler() throws Exception {
MockEndpoint start = context.getEndpoint("mock:start", MockEndpoint.class);
start.expectedMessageCount(4);

MockEndpoint end = context.getEndpoint("mock:end", MockEndpoint.class);
end.expectedMessageCount(0);

MockEndpoint dead = context.getEndpoint("mock:dead", MockEndpoint.class);
dead.expectedMessageCount(1);

producerTemplate.requestBody("direct:inheritErrorHandler", null, String.class);

start.assertIsSatisfied(5000);
end.assertIsSatisfied(5000);
dead.assertIsSatisfied(5000);
}
}

0 comments on commit 4e1ff26

Please sign in to comment.