Skip to content

Commit

Permalink
CAMEL-8526: Add more EIP as specialized mbeans
Browse files Browse the repository at this point in the history
  • Loading branch information
davsclaus committed Jul 25, 2015
1 parent c744d59 commit 7563d57
Show file tree
Hide file tree
Showing 18 changed files with 163 additions and 19 deletions.
Expand Up @@ -23,4 +23,7 @@ public interface ManagedRandomLoadBalancerMBean extends ManagedProcessorMBean {
@ManagedAttribute(description = "Number of processors in the load balancer")
Integer getSize();

@ManagedAttribute(description = "Processor id of the last chosen processor")
String getLastChosenProcessorId();

}
Expand Up @@ -23,4 +23,7 @@ public interface ManagedRoundRobinLoadBalancerMBean extends ManagedProcessorMBea
@ManagedAttribute(description = "Number of processors in the load balancer")
Integer getSize();

@ManagedAttribute(description = "Processor id of the last chosen processor")
String getLastChosenProcessorId();

}
Expand Up @@ -29,4 +29,7 @@ public interface ManagedStickyLoadBalancerMBean extends ManagedProcessorMBean {
@ManagedAttribute(description = "Number of processors in the load balancer")
Integer getSize();

@ManagedAttribute(description = "Processor id of the last chosen processor")
String getLastChosenProcessorId();

}
Expand Up @@ -32,4 +32,7 @@ public interface ManagedWeightedBalancerMBean extends ManagedProcessorMBean {
@ManagedAttribute(description = "The delimiter")
String getDistributionRatioDelimiter();

@ManagedAttribute(description = "Processor id of the last chosen processor")
String getLastChosenProcessorId();

}
Expand Up @@ -20,6 +20,7 @@
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.api.management.mbean.ManagedRandomLoadBalancerMBean;
import org.apache.camel.model.LoadBalanceDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.processor.loadbalancer.RandomLoadBalancer;

/**
Expand All @@ -34,8 +35,27 @@ public ManagedRandomLoadBalancer(CamelContext context, RandomLoadBalancer proces
this.processor = processor;
}

@Override
public LoadBalanceDefinition getDefinition() {
return (LoadBalanceDefinition) super.getDefinition();
}

@Override
public Integer getSize() {
return processor.getProcessors().size();
}

@Override
public String getLastChosenProcessorId() {
int idx = processor.getLastChosenProcessorIndex();
if (idx != -1) {
LoadBalanceDefinition def = getDefinition();
ProcessorDefinition<?> output = def.getOutputs().get(idx);
if (output != null) {
return output.getId();
}
}
return null;
}

}
Expand Up @@ -20,6 +20,7 @@
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.api.management.mbean.ManagedRoundRobinLoadBalancerMBean;
import org.apache.camel.model.LoadBalanceDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer;

/**
Expand All @@ -34,8 +35,26 @@ public ManagedRoundRobinLoadBalancer(CamelContext context, RoundRobinLoadBalance
this.processor = processor;
}

@Override
public LoadBalanceDefinition getDefinition() {
return (LoadBalanceDefinition) super.getDefinition();
}

@Override
public Integer getSize() {
return processor.getProcessors().size();
}

@Override
public String getLastChosenProcessorId() {
int idx = processor.getLastChosenProcessorIndex();
if (idx != -1) {
LoadBalanceDefinition def = getDefinition();
ProcessorDefinition<?> output = def.getOutputs().get(idx);
if (output != null) {
return output.getId();
}
}
return null;
}
}
Expand Up @@ -20,6 +20,7 @@
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.api.management.mbean.ManagedStickyLoadBalancerMBean;
import org.apache.camel.model.LoadBalanceDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.loadbalancer.StickyLoadBalancerDefinition;
import org.apache.camel.processor.loadbalancer.StickyLoadBalancer;

Expand Down Expand Up @@ -56,4 +57,18 @@ public String getExpression() {
public Integer getSize() {
return processor.getProcessors().size();
}

@Override
public String getLastChosenProcessorId() {
int idx = processor.getLastChosenProcessorIndex();
if (idx != -1) {
LoadBalanceDefinition def = getDefinition();
ProcessorDefinition<?> output = def.getOutputs().get(idx);
if (output != null) {
return output.getId();
}
}
return null;
}

}
Expand Up @@ -20,6 +20,7 @@
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.api.management.mbean.ManagedWeightedBalancerMBean;
import org.apache.camel.model.LoadBalanceDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.loadbalancer.WeightedLoadBalancerDefinition;
import org.apache.camel.processor.loadbalancer.WeightedLoadBalancer;

Expand Down Expand Up @@ -74,4 +75,18 @@ public String getDistributionRatioDelimiter() {
return null;
}
}

@Override
public String getLastChosenProcessorId() {
int idx = processor.getLastChosenProcessorIndex();
if (idx != -1) {
LoadBalanceDefinition def = getDefinition();
ProcessorDefinition<?> output = def.getOutputs().get(idx);
if (output != null) {
return output.getId();
}
}
return null;
}

}
Expand Up @@ -30,6 +30,7 @@
public class RandomLoadBalancer extends QueueLoadBalancer {

private static final Random RANDOM = new Random();
private transient int index;

protected synchronized Processor chooseProcessor(List<Processor> processors, Exchange exchange) {
int size = processors.size();
Expand All @@ -41,10 +42,14 @@ protected synchronized Processor chooseProcessor(List<Processor> processors, Exc
}

// pick a random
int index = RANDOM.nextInt(size);
index = RANDOM.nextInt(size);
return processors.get(index);
}

public int getLastChosenProcessorIndex() {
return index;
}

public String toString() {
return "RandomLoadBalancer";
}
Expand Down
Expand Up @@ -37,6 +37,10 @@ protected synchronized Processor chooseProcessor(List<Processor> processors, Exc
return processors.get(counter);
}

public int getLastChosenProcessorIndex() {
return counter;
}

public String toString() {
return "RoundRobinLoadBalancer";
}
Expand Down
Expand Up @@ -34,21 +34,13 @@
*/
public class StickyLoadBalancer extends QueueLoadBalancer {
private Expression correlationExpression;
private QueueLoadBalancer loadBalancer;
private RoundRobinLoadBalancer loadBalancer;
private int numberOfHashGroups = 64 * 1024;
private final Map<Object, Processor> stickyMap = new HashMap<Object, Processor>();

public StickyLoadBalancer(Expression correlationExpression) {
this(correlationExpression, new RoundRobinLoadBalancer());
}

public StickyLoadBalancer(Expression correlationExpression, QueueLoadBalancer loadBalancer) {
this.correlationExpression = correlationExpression;
this.loadBalancer = loadBalancer;
}

public Expression getCorrelationExpression() {
return correlationExpression;
this.loadBalancer = new RoundRobinLoadBalancer();
}

protected synchronized Processor chooseProcessor(List<Processor> processors, Exchange exchange) {
Expand Down Expand Up @@ -80,6 +72,13 @@ public void removeProcessor(Processor processor) {
super.removeProcessor(processor);
}

public int getLastChosenProcessorIndex() {
return loadBalancer.getLastChosenProcessorIndex();
}

public Expression getCorrelationExpression() {
return correlationExpression;
}

// Properties
//-------------------------------------------------------------------------
Expand Down
Expand Up @@ -22,6 +22,8 @@
public abstract class WeightedLoadBalancer extends QueueLoadBalancer {
private List<Integer> distributionRatioList = new ArrayList<Integer>();
private List<DistributionRatio> runtimeRatios = new ArrayList<DistributionRatio>();

transient int lastIndex;

public WeightedLoadBalancer(List<Integer> distributionRatios) {
deepCloneDistributionRatios(distributionRatios);
Expand All @@ -34,6 +36,10 @@ protected void deepCloneDistributionRatios(List<Integer> distributionRatios) {
}
}

public int getLastChosenProcessorIndex() {
return lastIndex;
}

@Override
protected void doStart() throws Exception {
super.doStart();
Expand Down
Expand Up @@ -26,7 +26,7 @@ public class WeightedRandomLoadBalancer extends WeightedLoadBalancer {
private final Random rnd = new Random();
private final int distributionRatioSum;
private int runtimeRatioSum;

public WeightedRandomLoadBalancer(List<Integer> distributionRatioList) {
super(distributionRatioList);
int sum = 0;
Expand All @@ -39,8 +39,9 @@ public WeightedRandomLoadBalancer(List<Integer> distributionRatioList) {

@Override
protected Processor chooseProcessor(List<Processor> processors, Exchange exchange) {
int selectedProcessorIndex = selectProcessIndex();
return processors.get(selectedProcessorIndex);
int index = selectProcessIndex();
lastIndex = index;
return processors.get(index);
}

public int selectProcessIndex() {
Expand Down
Expand Up @@ -48,6 +48,8 @@ protected synchronized Processor chooseProcessor(List<Processor> processors, Exc
counter++;
}
}

lastIndex = counter;

return processors.get(counter++);
}
Expand Down
Expand Up @@ -34,6 +34,8 @@ public void testManageRandomLoadBalancer() throws Exception {
return;
}

template.sendBodyAndHeader("direct:start", "Hello World", "foo", "123");

// get the stats for the route
MBeanServer mbeanServer = getMBeanServer();

Expand All @@ -53,6 +55,9 @@ public void testManageRandomLoadBalancer() throws Exception {
Integer size = (Integer) mbeanServer.getAttribute(on, "Size");
assertEquals(2, size.intValue());

String last = (String) mbeanServer.getAttribute(on, "LastChosenProcessorId");
assertTrue("foo".equals(last) || "bar".equals(last));

TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
assertNotNull(data);
assertEquals(2, data.size());
Expand All @@ -73,7 +78,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
public void configure() throws Exception {
from("direct:start")
.loadBalance().random().id("mysend")
.to("mock:foo", "mock:bar");
.to("mock:foo").id("foo").to("mock:bar").id("bar");
}
};
}
Expand Down
Expand Up @@ -22,6 +22,7 @@

import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;

/**
* @version
Expand All @@ -34,6 +35,17 @@ public void testManageRoundRobinLoadBalancer() throws Exception {
return;
}

MockEndpoint foo = getMockEndpoint("mock:foo");
foo.expectedMessageCount(1);

MockEndpoint bar = getMockEndpoint("mock:bar");
bar.expectedMessageCount(1);

template.sendBodyAndHeader("direct:start", "Hello World", "foo", "123");
template.sendBodyAndHeader("direct:start", "Bye World", "foo", "123");

assertMockEndpointsSatisfied();

// get the stats for the route
MBeanServer mbeanServer = getMBeanServer();

Expand All @@ -53,6 +65,9 @@ public void testManageRoundRobinLoadBalancer() throws Exception {
Integer size = (Integer) mbeanServer.getAttribute(on, "Size");
assertEquals(2, size.intValue());

String last = (String) mbeanServer.getAttribute(on, "LastChosenProcessorId");
assertEquals("bar", last);

TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
assertNotNull(data);
assertEquals(2, data.size());
Expand All @@ -73,7 +88,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
public void configure() throws Exception {
from("direct:start")
.loadBalance().roundRobin().id("mysend")
.to("mock:foo", "mock:bar");
.to("mock:foo").id("foo").to("mock:bar").id("bar");
}
};
}
Expand Down
Expand Up @@ -28,12 +28,14 @@
*/
public class ManagedStickyLoadBalancerTest extends ManagementTestSupport {

public void testManageRandomLoadBalancer() throws Exception {
public void testManageStickyLoadBalancer() throws Exception {
// JMX tests dont work well on AIX CI servers (hangs them)
if (isPlatform("aix")) {
return;
}

template.sendBodyAndHeader("direct:start", "Hello World", "num", "123");

// get the stats for the route
MBeanServer mbeanServer = getMBeanServer();

Expand All @@ -59,6 +61,14 @@ public void testManageRandomLoadBalancer() throws Exception {
String uri = (String) mbeanServer.getAttribute(on, "Expression");
assertEquals("num", uri);

String last = (String) mbeanServer.getAttribute(on, "LastChosenProcessorId");
assertTrue("foo".equals(last) || "bar".equals(last));

template.sendBodyAndHeader("direct:start", "Bye World", "num", "123");

String last2 = (String) mbeanServer.getAttribute(on, "LastChosenProcessorId");
assertEquals("Should be sticky", last, last2);

TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
assertNotNull(data);
assertEquals(2, data.size());
Expand All @@ -79,7 +89,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
public void configure() throws Exception {
from("direct:start")
.loadBalance().sticky(header("num")).id("mysend")
.to("mock:foo", "mock:bar");
.to("mock:foo").id("foo").to("mock:bar").id("bar");
}
};
}
Expand Down

0 comments on commit 7563d57

Please sign in to comment.