Skip to content

Commit

Permalink
CAMEL-3910: Failover LB should have sticky mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
davsclaus committed Jul 12, 2015
1 parent b930834 commit 308529f
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,28 @@ public LoadBalanceDefinition failover(Class<?>... exceptions) {
* @return the builder
*/
public LoadBalanceDefinition failover(int maximumFailoverAttempts, boolean inheritErrorHandler, boolean roundRobin, Class<?>... exceptions) {
return failover(maximumFailoverAttempts, inheritErrorHandler, roundRobin, false, exceptions);
}

/**
* Uses fail over load balancer
*
* @param maximumFailoverAttempts maximum number of failover attempts before exhausting.
* Use -1 to newer exhaust when round robin is also enabled.
* If round robin is disabled then it will exhaust when there are no more endpoints to failover
* @param inheritErrorHandler whether or not to inherit error handler.
* If <tt>false</tt> then it will failover immediately in case of an exception
* @param roundRobin whether or not to use round robin (which keeps state)
* @param sticky whether or not to use sticky (which keeps state)
* @param exceptions exception classes which we want to failover if one of them was thrown
* @return the builder
*/
public LoadBalanceDefinition failover(int maximumFailoverAttempts, boolean inheritErrorHandler, boolean roundRobin, boolean sticky, Class<?>... exceptions) {
FailoverLoadBalancerDefinition def = new FailoverLoadBalancerDefinition();
def.setExceptionTypes(Arrays.asList(exceptions));
def.setMaximumFailoverAttempts(maximumFailoverAttempts);
def.setRoundRobin(roundRobin);
def.setSticky(sticky);
setLoadBalancerType(def);
this.setInheritErrorHandler(inheritErrorHandler);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class FailoverLoadBalancerDefinition extends LoadBalancerDefinition {
private List<String> exceptions = new ArrayList<String>();
@XmlAttribute
private Boolean roundRobin;
@XmlAttribute
private Boolean sticky;
@XmlAttribute @Metadata(defaultValue = "-1")
private Integer maximumFailoverAttempts;

Expand Down Expand Up @@ -87,6 +89,9 @@ protected LoadBalancer createLoadBalancer(RouteContext routeContext) {
if (roundRobin != null) {
answer.setRoundRobin(roundRobin);
}
if (sticky != null) {
answer.setSticky(sticky);
}

return answer;
}
Expand Down Expand Up @@ -124,12 +129,31 @@ public Boolean getRoundRobin() {
* If not, then it will always start from the first endpoint when a new message is to be processed.
* In other words it restart from the top for every message.
* If round robin is enabled, then it keeps state and will continue with the next endpoint in a round robin fashion.
* When using round robin it will not stick to last known good endpoint, it will always pick the next endpoint to use.
* <p/>
* You can also enable sticky mode together with round robin, if so then it will pick the last known good endpoint
* to use when starting the load balancing (instead of using the next when starting).
*/
public void setRoundRobin(Boolean roundRobin) {
this.roundRobin = roundRobin;
}

public Boolean getSticky() {
return sticky;
}

/**
* Whether or not the failover load balancer should operate in sticky mode or not.
* If not, then it will always start from the first endpoint when a new message is to be processed.
* In other words it restart from the top for every message.
* If sticky is enabled, then it keeps state and will continue with the last known good endpoint.
* <p/>
* You can also enable sticky mode together with round robin, if so then it will pick the last known good endpoint
* to use when starting the load balancing (instead of using the next when starting).
*/
public void setSticky(Boolean sticky) {
this.sticky = sticky;
}

public Integer getMaximumFailoverAttempts() {
return maximumFailoverAttempts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
private final List<Class<?>> exceptions;
private CamelContext camelContext;
private boolean roundRobin;
private boolean sticky;
private int maximumFailoverAttempts = -1;

// stateful counter
private final AtomicInteger counter = new AtomicInteger(-1);
private final AtomicInteger lastGoodIndex = new AtomicInteger();

public FailOverLoadBalancer() {
this.exceptions = null;
Expand Down Expand Up @@ -85,6 +87,14 @@ public void setRoundRobin(boolean roundRobin) {
this.roundRobin = roundRobin;
}

public boolean isSticky() {
return sticky;
}

public void setSticky(boolean sticky) {
this.sticky = sticky;
}

public int getMaximumFailoverAttempts() {
return maximumFailoverAttempts;
}
Expand Down Expand Up @@ -147,7 +157,9 @@ public boolean process(final Exchange exchange, final AsyncCallback callback) {
Exchange copy = null;

// get the next processor
if (isRoundRobin()) {
if (isSticky()) {
index.set(lastGoodIndex.get());
} else if (isRoundRobin()) {
if (counter.incrementAndGet() >= processors.size()) {
counter.set(0);
}
Expand Down Expand Up @@ -214,6 +226,9 @@ public boolean process(final Exchange exchange, final AsyncCallback callback) {
log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
}

// remember last good index
lastGoodIndex.set(index.get());

// and copy the current result to original so it will contain this result of this eip
if (copy != null) {
ExchangeHelper.copyResults(exchange, copy);
Expand Down Expand Up @@ -323,14 +338,17 @@ public void done(boolean doneSync) {
}
}

// remember last good index
lastGoodIndex.set(index.get());

// and copy the current result to original so it will contain this result of this eip
if (copy != null) {
ExchangeHelper.copyResults(exchange, copy);
}
log.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
// signal callback we are done
callback.done(false);
};
}
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;

import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;

/**
* @version
*/
public class FailoverRoundRobinStickyTest extends ContextTestSupport {

public void testFailoverRoundRobinSticky() throws Exception {
getMockEndpoint("mock:bad").expectedBodiesReceived("Hello World");
getMockEndpoint("mock:bad2").expectedBodiesReceived("Hello World");
getMockEndpoint("mock:good").expectedBodiesReceived("Hello World");
getMockEndpoint("mock:good2").expectedMessageCount(0);

template.sendBody("direct:start", "Hello World");

assertMockEndpointsSatisfied();

// as its round robin and sticky based it remembers that last good endpoint
// and will invoke the last good

resetMocks();

getMockEndpoint("mock:bad").expectedMessageCount(0);
getMockEndpoint("mock:bad2").expectedMessageCount(0);
getMockEndpoint("mock:good").expectedBodiesReceived("Bye World");
getMockEndpoint("mock:good2").expectedMessageCount(0);

template.sendBody("direct:start", "Bye World");

assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// START SNIPPET: e1
from("direct:start")
// Use failover load balancer in stateful round robin and sticky mode
// which mean it will failover immediately in case of an exception
// as it does NOT inherit error handler. It will also keep retrying as
// its configured to newer exhaust.
.loadBalance().failover(-1, false, true, true).
to("direct:bad", "direct:bad2", "direct:good", "direct:good2");
// END SNIPPET: e1

from("direct:bad")
.to("mock:bad")
.throwException(new IllegalArgumentException("Damn"));

from("direct:bad2")
.to("mock:bad2")
.throwException(new IllegalArgumentException("Damn Again"));

from("direct:good")
.to("mock:good");

from("direct:good2")
.to("mock:good2");
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;

import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;

/**
* @version
*/
public class FailoverStickyTest extends ContextTestSupport {

public void testFailoverSticky() throws Exception {
getMockEndpoint("mock:bad").expectedBodiesReceived("Hello World");
getMockEndpoint("mock:bad2").expectedBodiesReceived("Hello World");
getMockEndpoint("mock:good").expectedBodiesReceived("Hello World");
getMockEndpoint("mock:good2").expectedMessageCount(0);

template.sendBody("direct:start", "Hello World");

assertMockEndpointsSatisfied();

// as its sticky based it remembers that last good endpoint
// and will invoke the last good

resetMocks();

getMockEndpoint("mock:bad").expectedMessageCount(0);
getMockEndpoint("mock:bad2").expectedMessageCount(0);
getMockEndpoint("mock:good").expectedBodiesReceived("Bye World");
getMockEndpoint("mock:good2").expectedMessageCount(0);

template.sendBody("direct:start", "Bye World");

assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// START SNIPPET: e1
from("direct:start")
// Use failover load balancer in stateful sticky mode
// which mean it will failover immediately in case of an exception
// as it does NOT inherit error handler. It will also keep retrying as
// its configured to newer exhaust.
.loadBalance().failover(-1, false, false, true).
to("direct:bad", "direct:bad2", "direct:good", "direct:good2");
// END SNIPPET: e1

from("direct:bad")
.to("mock:bad")
.throwException(new IllegalArgumentException("Damn"));

from("direct:bad2")
.to("mock:bad2")
.throwException(new IllegalArgumentException("Damn Again"));

from("direct:good")
.to("mock:good");

from("direct:good2")
.to("mock:good2");
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.spring.processor;

import org.apache.camel.CamelContext;
import org.apache.camel.processor.FailoverRoundRobinStickyTest;

import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;

/**
* @version
*/
public class SpringFailoverRoundRobinStickyTest extends FailoverRoundRobinStickyTest {

protected CamelContext createCamelContext() throws Exception {
return createSpringCamelContext(this, "org/apache/camel/spring/processor/FailoverRoundRobinStickyTest.xml");
}
}
Loading

0 comments on commit 308529f

Please sign in to comment.