Skip to content

Commit

Permalink
CAMEL-9999 : ServiceCall : improve APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Jun 7, 2016
1 parent 7eae925 commit 326e615
Show file tree
Hide file tree
Showing 32 changed files with 1,120 additions and 1,350 deletions.
Expand Up @@ -16,7 +16,7 @@
*/ */
package org.apache.camel.impl.remote; package org.apache.camel.impl.remote;


import java.util.Collection; import java.util.List;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;


import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncCallback;
Expand All @@ -25,33 +25,33 @@
import org.apache.camel.CamelContextAware; import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
import org.apache.camel.Traceable; import org.apache.camel.Traceable;
import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.processor.SendDynamicProcessor;
import org.apache.camel.spi.IdAware; import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ServiceCallLoadBalancer; import org.apache.camel.spi.ServiceCallLoadBalancer;
import org.apache.camel.spi.ServiceCallServer; import org.apache.camel.spi.ServiceCallServer;
import org.apache.camel.spi.ServiceCallServerListStrategy; import org.apache.camel.spi.ServiceCallServerListStrategy;
import org.apache.camel.support.ServiceCallExpressionSupport;
import org.apache.camel.support.ServiceSupport; import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


public class DefaultServiceCallProcessor extends ServiceSupport implements AsyncProcessor, CamelContextAware, Traceable, IdAware { public class DefaultServiceCallProcessor<S extends ServiceCallServer> extends ServiceSupport implements AsyncProcessor, CamelContextAware, Traceable, IdAware {
private static final Logger LOG = LoggerFactory.getLogger(DefaultServiceCallProcessor.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultServiceCallProcessor.class);


private final ServiceCallExpressionSupport serviceCallExpression;
private final ExchangePattern exchangePattern; private final ExchangePattern exchangePattern;
private final String uri;
private final String name; private final String name;
private final String scheme; private final String scheme;
private final String uri;
private final String contextPath; private final String contextPath;
private CamelContext camelContext; private CamelContext camelContext;
private String id; private String id;
private ServiceCallServerListStrategy<ServiceCallServer> serverListStrategy; private ServiceCallServerListStrategy<S> serverListStrategy;
private ServiceCallLoadBalancer<ServiceCallServer> loadBalancer; private ServiceCallLoadBalancer<S> loadBalancer;
private Expression serviceCallExpression;
private SendDynamicProcessor processor; private SendDynamicProcessor processor;


public DefaultServiceCallProcessor(String name, String scheme, String uri, ExchangePattern exchangePattern) { public DefaultServiceCallProcessor(String name, String scheme, String uri, ExchangePattern exchangePattern) {
Expand Down Expand Up @@ -110,40 +110,76 @@ public String getTraceLabel() {
return id; return id;
} }


public ServiceCallLoadBalancer<ServiceCallServer> getLoadBalancer() { public String getName() {
return name;
}

public String getScheme() {
return scheme;
}

public String getContextPath() {
return contextPath;
}

public String getUri() {
return uri;
}

public ExchangePattern getExchangePattern() {
return exchangePattern;
}

public ServiceCallLoadBalancer<S> getLoadBalancer() {
return loadBalancer; return loadBalancer;
} }


public void setLoadBalancer(ServiceCallLoadBalancer<ServiceCallServer> loadBalancer) { public void setLoadBalancer(ServiceCallLoadBalancer<S> loadBalancer) {
this.loadBalancer = loadBalancer; this.loadBalancer = loadBalancer;
} }


public DefaultServiceCallProcessor loadBalancer(ServiceCallLoadBalancer<ServiceCallServer> loadBalancer) { public DefaultServiceCallProcessor loadBalancer(ServiceCallLoadBalancer<S> loadBalancer) {
setLoadBalancer(loadBalancer); setLoadBalancer(loadBalancer);
return this; return this;
} }


public ServiceCallServerListStrategy<ServiceCallServer> getServerListStrategy() { public ServiceCallServerListStrategy<S> getServerListStrategy() {
return serverListStrategy; return serverListStrategy;
} }


public void setServerListStrategy(ServiceCallServerListStrategy<ServiceCallServer> serverListStrategy) { public void setServerListStrategy(ServiceCallServerListStrategy<S> serverListStrategy) {
this.serverListStrategy = serverListStrategy; this.serverListStrategy = serverListStrategy;
} }


public DefaultServiceCallProcessor serverListStrategy(ServiceCallServerListStrategy<ServiceCallServer> serverListStrategy) { public DefaultServiceCallProcessor serverListStrategy(ServiceCallServerListStrategy<S> serverListStrategy) {
setServerListStrategy(serverListStrategy); setServerListStrategy(serverListStrategy);
return this; return this;
} }


public void setServiceCallExpression(Expression serviceCallExpression) {
this.serviceCallExpression = serviceCallExpression;
}

public Expression getServiceCallExpression() {
return serviceCallExpression;
}

public DefaultServiceCallProcessor serviceCallExpression(Expression serviceCallExpression) {
setServiceCallExpression(serviceCallExpression);
return this;
}

public AsyncProcessor getProcessor() {
return processor;
}

@Override @Override
protected void doStart() throws Exception { protected void doStart() throws Exception {
ObjectHelper.notEmpty(getName(), "name", "serviceName");
ObjectHelper.notNull(camelContext, "camelContext"); ObjectHelper.notNull(camelContext, "camelContext");
ObjectHelper.notNull(serverListStrategy, "serverListStrategy"); ObjectHelper.notNull(serviceCallExpression, "serviceCallExpression");
ObjectHelper.notNull(loadBalancer, "loadBalancer");



LOG.info("ConsulsServiceCall at dc: {} with service name: {} is using load balancer: {} and service discovery: {}", LOG.info("ServiceCall with service name: {} is using load balancer: {} and service discovery: {}",
name, loadBalancer, serverListStrategy); name, loadBalancer, serverListStrategy);


processor = new SendDynamicProcessor(uri, serviceCallExpression); processor = new SendDynamicProcessor(uri, serviceCallExpression);
Expand All @@ -167,33 +203,48 @@ public void process(Exchange exchange) throws Exception {


@Override @Override
public boolean process(Exchange exchange, AsyncCallback callback) { public boolean process(Exchange exchange, AsyncCallback callback) {
Collection<ServiceCallServer> servers = null; final String serviceName = exchange.getIn().getHeader(ServiceCallConstants.SERVICE_NAME, name, String.class);
String serviceName = exchange.getIn().getHeader(ServiceCallConstants.SERVICE_NAME, name, String.class); final ServiceCallServer server = chooseServer(exchange, serviceName);
try {
servers = serverListStrategy.getUpdatedListOfServers(serviceName);
if (servers == null || servers.isEmpty()) {
exchange.setException(new RejectedExecutionException("No active services with name " + name));
}
} catch (Throwable e) {
exchange.setException(e);
}


if (exchange.getException() != null) { if (exchange.getException() != null) {
callback.done(true); callback.done(true);
return true; return true;
} }


// let the client load balancer chose which server to use
ServiceCallServer server = loadBalancer.chooseServer(servers);
String ip = server.getIp(); String ip = server.getIp();
int port = server.getPort(); int port = server.getPort();
LOG.debug("Service {} active at server: {}:{}", name, ip, port); LOG.debug("Service {} active at server: {}:{}", name, ip, port);


// set selected server as header // set selected server as header
exchange.getIn().setHeader(ServiceCallConstants.SERVER_IP, ip); exchange.getIn().setHeader(ServiceCallConstants.SERVER_IP, ip);
exchange.getIn().setHeader(ServiceCallConstants.SERVER_PORT, port); exchange.getIn().setHeader(ServiceCallConstants.SERVER_PORT, port);
exchange.getIn().setHeader(ServiceCallConstants.SERVICE_NAME, serviceName);


// use the dynamic send processor to call the service // use the dynamic send processor to call the service
return processor.process(exchange, callback); return processor.process(exchange, callback);
} }

protected S chooseServer(Exchange exchange, String serviceName) {
ObjectHelper.notNull(serverListStrategy, "serverListStrategy");
ObjectHelper.notNull(loadBalancer, "loadBalancer");

S server = null;

try {
List<S> servers = serverListStrategy.getUpdatedListOfServers(serviceName);
if (servers == null || servers.isEmpty()) {
exchange.setException(new RejectedExecutionException("No active services with name " + name));
}

// let the client load balancer chose which server to use
server = servers.size() > 1 ? loadBalancer.chooseServer(servers) : servers.get(0);
if (server == null) {
exchange.setException(new RejectedExecutionException("No active services with name " + name));
}
} catch (Throwable e) {
exchange.setException(e);
}

return server;
}
} }

0 comments on commit 326e615

Please sign in to comment.