Skip to content

Commit

Permalink
CAMEL-9879: Circuit Breaker EIP - That is using hystrix. Camel hystri…
Browse files Browse the repository at this point in the history
…x example.
  • Loading branch information
davsclaus committed Apr 21, 2016
1 parent c1649ce commit 7b5ccff
Show file tree
Hide file tree
Showing 16 changed files with 399 additions and 138 deletions.
Expand Up @@ -188,12 +188,30 @@ public HystrixDefinition hystrixConfiguration(String ref) {
} }


/** /**
* The Hystrix fallback route path to execute. * The Hystrix fallback route path to execute that does <b>not</b> go over the network.
* <p>
* This should be a static or cached result that can immediately be returned upon failure.
* If the fallback requires network connection then use {@link #onFallbackViaNetwork()}.
*/ */
public HystrixDefinition onFallback() { public HystrixDefinition onFallback() {
onFallback = new OnFallbackDefinition(); onFallback = new OnFallbackDefinition();
onFallback.setParent(this); onFallback.setParent(this);
return this; return this;
} }


/**
* The Hystrix fallback route path to execute that will go over the network.
* <p/>
* If the fallback will go over the network it is another possible point of failure and so it also needs to be
* wrapped by a HystrixCommand. It is important to execute the fallback command on a separate thread-pool,
* otherwise if the main command were to become latent and fill the thread-pool
* this would prevent the fallback from running if the two commands share the same pool.
*/
public HystrixDefinition onFallbackViaNetwork() {
onFallback = new OnFallbackDefinition();
onFallback.setFallbackViaNetwork(true);
onFallback.setParent(this);
return this;
}

} }
Expand Up @@ -19,6 +19,7 @@
import java.util.List; import java.util.List;
import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;


import org.apache.camel.Processor; import org.apache.camel.Processor;
Expand All @@ -34,12 +35,20 @@
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
public class OnFallbackDefinition extends OutputDefinition<OnFallbackDefinition> { public class OnFallbackDefinition extends OutputDefinition<OnFallbackDefinition> {


@XmlAttribute
@Metadata(label = "command", defaultValue = "false")
private Boolean fallbackViaNetwork;

public OnFallbackDefinition() { public OnFallbackDefinition() {
} }


@Override @Override
public String toString() { public String toString() {
return "OnFallback[" + getOutputs() + "]"; if (fallbackViaNetwork != null && fallbackViaNetwork) {
return "OnFallbackViaNetwork[" + getOutputs() + "]";
} else {
return "OnFallback[" + getOutputs() + "]";
}
} }


@Override @Override
Expand All @@ -49,12 +58,36 @@ public Processor createProcessor(RouteContext routeContext) throws Exception {


@Override @Override
public String getLabel() { public String getLabel() {
CollectionStringBuffer buffer = new CollectionStringBuffer("onFallback["); String name = fallbackViaNetwork != null && fallbackViaNetwork ? "onFallbackViaNetwork" : "onFallback";
CollectionStringBuffer buffer = new CollectionStringBuffer(name);
buffer.append("[");
List<ProcessorDefinition<?>> list = getOutputs(); List<ProcessorDefinition<?>> list = getOutputs();
for (ProcessorDefinition<?> type : list) { for (ProcessorDefinition<?> type : list) {
buffer.append(type.getLabel()); buffer.append(type.getLabel());
} }
buffer.append("]"); buffer.append("]");
return buffer.toString(); return buffer.toString();
} }

public Boolean getFallbackViaNetwork() {
return fallbackViaNetwork;
}

/**
* Whether the fallback goes over the network.
* <p/>
* If the fallback will go over the network it is another possible point of failure and so it also needs to be
* wrapped by a HystrixCommand. It is important to execute the fallback command on a separate thread-pool,
* otherwise if the main command were to become latent and fill the thread-pool
* this would prevent the fallback from running if the two commands share the same pool.
*/
public void setFallbackViaNetwork(Boolean fallbackViaNetwork) {
this.fallbackViaNetwork = fallbackViaNetwork;
}

public boolean isFallbackViaNetwork() {
// is default false
return fallbackViaNetwork != null && fallbackViaNetwork;
}

} }
Expand Up @@ -26,48 +26,70 @@
import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor; import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Navigate; import org.apache.camel.Navigate;
import org.apache.camel.Processor; import org.apache.camel.Processor;
import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.spi.IdAware; import org.apache.camel.spi.IdAware;
import org.apache.camel.support.ServiceSupport; import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.AsyncProcessorHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/** /**
* Implementation of the Hystrix EIP. * Implementation of the Hystrix EIP.
*/ */
@ManagedResource(description = "Managed Hystrix Processor") @ManagedResource(description = "Managed Hystrix Processor")
public class HystrixProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, org.apache.camel.Traceable, IdAware { public class HystrixProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, org.apache.camel.Traceable, IdAware {


private static final Logger LOG = LoggerFactory.getLogger(HystrixProcessor.class);
private String id; private String id;
private final HystrixCommandKey commandKey;
private final HystrixCommandGroupKey groupKey; private final HystrixCommandGroupKey groupKey;
private final HystrixCommand.Setter setter; private final HystrixCommandKey commandKey;
private final AsyncProcessor processor; private final HystrixCommandKey fallbackCommandKey;
private final AsyncProcessor fallback; private final com.netflix.hystrix.HystrixCommand.Setter setter;

private final com.netflix.hystrix.HystrixCommand.Setter fallbackSetter;
public HystrixProcessor(HystrixCommandKey commandKey, HystrixCommandGroupKey groupKey, HystrixCommand.Setter setter, private final Processor processor;
Processor processor, Processor fallback) { private final Processor fallback;
this.commandKey = commandKey; private final boolean fallbackViaNetwork;

public HystrixProcessor(HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandKey fallbackCommandKey,
HystrixCommand.Setter setter, HystrixCommand.Setter fallbackSetter,
Processor processor, Processor fallback, boolean fallbackViaNetwork) {
this.groupKey = groupKey; this.groupKey = groupKey;
this.commandKey = commandKey;
this.fallbackCommandKey = fallbackCommandKey;
this.setter = setter; this.setter = setter;
this.processor = AsyncProcessorConverterHelper.convert(processor); this.fallbackSetter = fallbackSetter;
this.fallback = AsyncProcessorConverterHelper.convert(fallback); this.processor = processor;
this.fallback = fallback;
this.fallbackViaNetwork = fallbackViaNetwork;
} }


@ManagedAttribute @ManagedAttribute
public String getHystrixCommandKey() { public String getHystrixCommandKey() {
return commandKey.name(); return commandKey.name();
} }


@ManagedAttribute
public String getHystrixFallbackCommandKey() {
if (fallbackCommandKey != null) {
return fallbackCommandKey.name();
} else {
return null;
}
}

@ManagedAttribute @ManagedAttribute
public String getHystrixGroupKey() { public String getHystrixGroupKey() {
return groupKey.name(); return groupKey.name();
} }


@ManagedAttribute
public boolean isFallbackViaNetwork() {
return isFallbackViaNetwork();
}

@ManagedAttribute @ManagedAttribute
public int getHystrixTotalTimeMean() { public int getHystrixTotalTimeMean() {
HystrixCommandMetrics metrics = HystrixCommandMetrics.getInstance(commandKey); HystrixCommandMetrics metrics = HystrixCommandMetrics.getInstance(commandKey);
Expand Down Expand Up @@ -161,38 +183,24 @@ public void process(Exchange exchange) throws Exception {
} }


@Override @Override
public boolean process(final Exchange exchange, final AsyncCallback callback) { public boolean process(Exchange exchange, AsyncCallback callback) {
// run this as if we run inside try .. catch so there is no regular Camel error handler // run this as if we run inside try .. catch so there is no regular Camel error handler
exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true); exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);


try { try {
// create command HystrixProcessorCommandFallbackViaNetwork fallbackCommand = null;
HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback, processor, fallback); if (fallbackViaNetwork) {

fallbackCommand = new HystrixProcessorCommandFallbackViaNetwork(fallbackSetter, exchange, fallback);
// execute the command asynchronous and observe when its done }
command.observe().subscribe((msg) -> { HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, processor, fallback, fallbackCommand);
if (command.isResponseFromCache()) { command.execute();
// its from cache so need to copy it into the exchange
Message target = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
target.copyFrom(msg);
} else {
// if it was not from cache then run/fallback was executed and the result
// is already set correctly on the exchange and we do not need to do anything
}
}, throwable -> {
exchange.setException(throwable);
}, () -> {
exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
callback.done(false);
});
} catch (Throwable e) { } catch (Throwable e) {
// error adding to queue, so set as error and we are done
exchange.setException(e); exchange.setException(e);
callback.done(true);
return true;
} }


return false; exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
callback.done(true);
return true;
} }


@Override @Override
Expand Down
Expand Up @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* * <p/>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p/>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,30 +17,30 @@
package org.apache.camel.component.hystrix.processor; package org.apache.camel.component.hystrix.processor;


import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommand;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.Message; import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


/** /**
* Hystrix Command for the Camel Hystrix EIP. * Hystrix Command for the Camel Hystrix EIP.
*/ */
public class HystrixProcessorCommand extends HystrixCommand<Message> { public class HystrixProcessorCommand extends HystrixCommand {


private static final Logger LOG = LoggerFactory.getLogger(HystrixProcessorCommand.class); private static final Logger LOG = LoggerFactory.getLogger(HystrixProcessorCommand.class);
private final Exchange exchange; private final Exchange exchange;
private final AsyncCallback callback; private final Processor processor;
private final AsyncProcessor processor; private final Processor fallback;
private final AsyncProcessor fallback; private final HystrixProcessorCommandFallbackViaNetwork fallbackCommand;


public HystrixProcessorCommand(Setter setter, Exchange exchange, AsyncCallback callback, AsyncProcessor processor, AsyncProcessor fallback) { public HystrixProcessorCommand(Setter setter, Exchange exchange, Processor processor, Processor fallback,
HystrixProcessorCommandFallbackViaNetwork fallbackCommand) {
super(setter); super(setter);
this.exchange = exchange; this.exchange = exchange;
this.callback = callback;
this.processor = processor; this.processor = processor;
this.fallback = fallback; this.fallback = fallback;
this.fallbackCommand = fallbackCommand;
} }


@Override @Override
Expand All @@ -51,30 +51,33 @@ protected Message getFallback() {
return exchange.hasOut() ? exchange.getOut() : exchange.getIn(); return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
} }


try { if (fallback != null || fallbackCommand != null) {
if (fallback != null) { LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage());
LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage()); // store the last to endpoint as the failure endpoint
// store the last to endpoint as the failure endpoint if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); }
} // give the rest of the pipeline another chance
// give the rest of the pipeline another chance exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
exchange.setProperty(Exchange.EXCEPTION_HANDLED, true); exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception); exchange.setException(null);
exchange.setException(null); // and we should not be regarded as exhausted as we are in a try .. catch block
// and we should not be regarded as exhausted as we are in a try .. catch block exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); // run the fallback processor
// run the fallback processor try {
try { // use fallback command if provided (fallback via network)
if (fallbackCommand != null) {
return fallbackCommand.execute();
} else {
LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange); LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
fallback.process(exchange, callback); // process the fallback until its fully done
} catch (Exception e) { // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
exchange.setException(e); fallback.process(exchange);
LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
} }
} catch (Exception e) {
exchange.setException(e);
} }
} finally {
LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
} }


return exchange.hasOut() ? exchange.getOut() : exchange.getIn(); return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
Expand All @@ -85,7 +88,9 @@ protected Message run() throws Exception {
LOG.debug("Running processor: {} with exchange: {}", processor, exchange); LOG.debug("Running processor: {} with exchange: {}", processor, exchange);


try { try {
processor.process(exchange, callback); // process the processor until its fully done
// (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
processor.process(exchange);
} catch (Exception e) { } catch (Exception e) {
exchange.setException(e); exchange.setException(e);
} }
Expand Down

0 comments on commit 7b5ccff

Please sign in to comment.