Permalink
Browse files

CAMEL-5316: Failover EIP now detects shutdown in progress and breaks …

…out from failover loop.

git-svn-id: https://svn.apache.org/repos/asf/camel/trunk@1343704 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent cad5740 commit 78d83eae75f82656a6e0c4b42fd12b7c889c5d1c @davsclaus davsclaus committed May 29, 2012
@@ -17,10 +17,13 @@
package org.apache.camel.processor.loadbalancer;
import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
@@ -36,9 +39,10 @@
* as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the
* pipeline to ensure it works the same and the async routing engine is flawless.
*/
-public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceable {
+public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceable, CamelContextAware {
private final List<Class<?>> exceptions;
+ private CamelContext camelContext;
private boolean roundRobin;
private int maximumFailoverAttempts = -1;
@@ -60,6 +64,16 @@ public FailOverLoadBalancer(List<Class<?>> exceptions) {
}
}
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
public List<Class<?>> getExceptions() {
return exceptions;
}
@@ -113,6 +127,16 @@ protected boolean shouldFailOver(Exchange exchange) {
return answer;
}
+ @Override
+ public boolean isRunAllowed() {
+ // determine if we can still run, or the camel context is forcing a shutdown
+ boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
+ if (forceShutdown) {
+ log.trace("Run not allowed as ShutdownStrategy is forcing shutting down");
+ }
+ return !forceShutdown && super.isRunAllowed();
+ }
+
public boolean process(final Exchange exchange, final AsyncCallback callback) {
final List<Processor> processors = getProcessors();
@@ -133,6 +157,18 @@ public boolean process(final Exchange exchange, final AsyncCallback callback) {
log.trace("Failover starting with endpoint index {}", index);
while (first || shouldFailOver(copy)) {
+
+ // can we still run
+ if (!isRunAllowed()) {
+ log.trace("Run not allowed, will reject executing exchange: {}", exchange);
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ // we cannot process so invoke callback
+ callback.done(true);
+ return true;
+ }
+
if (!first) {
attempts.incrementAndGet();
// are we exhausted by attempts?
@@ -240,6 +276,17 @@ public void done(boolean doneSync) {
}
while (shouldFailOver(copy)) {
+
+ // can we still run
+ if (!isRunAllowed()) {
+ log.trace("Run not allowed, will reject executing exchange: {}", exchange);
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ // we cannot process so invoke callback
+ callback.done(false);
+ }
+
attempts.incrementAndGet();
// are we exhausted by attempts?
if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) {
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.StopWatch;
+
+/**
+ * Tests that the failover load balancer will break out if CamelContext is shutting down.
+ */
+public class FailoverLoadBalancerBreakoutDuringShutdownTest extends ContextTestSupport {
+
+ public void testFailover() throws Exception {
+
+ getMockEndpoint("mock:before").expectedMessageCount(1);
+ getMockEndpoint("mock:after").expectedMessageCount(0);
+
+ template.sendBody("seda:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ // use a stop watch to time how long it takes to force the shutdown
+ StopWatch watch = new StopWatch();
+
+ // force quicker shutdown
+ context.getShutdownStrategy().setTimeout(1);
+ context.stop();
+
+ // should take less than 5 seconds
+ assertTrue("Should take less than 5 seconds, was " + watch.taken(), watch.stop() < 5000);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ from("seda:start")
+ .to("mock:before")
+ // just keep on failover
+ .loadBalance().failover(-1, false, true)
+ .to("direct:a")
+ .to("direct:b")
+ .end()
+ .to("mock:after");
+
+ from("direct:a")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ throw new IllegalArgumentException("Forced");
+ }
+ });
+
+ from("direct:b")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ throw new IllegalArgumentException("Forced");
+ }
+ });
+ }
+ };
+ }
+}

0 comments on commit 78d83ea

Please sign in to comment.