Skip to content

Commit

Permalink
Merge 279d10b into e2e9d3c
Browse files Browse the repository at this point in the history
  • Loading branch information
kkovarik committed Sep 9, 2019
2 parents e2e9d3c + 279d10b commit b654afb
Show file tree
Hide file tree
Showing 24 changed files with 1,825 additions and 1 deletion.
@@ -0,0 +1,91 @@
package org.openhubframework.openhub.component.circuitbreaker;

import java.util.Map;

import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.util.StringHelper;
import org.openhubframework.openhub.spi.circuitbreaker.CircuitBreaker;
import org.openhubframework.openhub.spi.circuitbreaker.CircuitConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;

/**
* Custom component for simple use of circuit breaker.
*
* Usage:
* .to("circuit:<circuit-name>:<uri-to-call>"),
* where:
* <circuit-name> is unique name of circuit, recommended granularity
* is one circuit per target system.
* <uri-to-call> is uri to be invoked if circuit is up.
* Configuration:
* {@link CircuitConfiguration} in Exchange property {@link CircuitBreaker#CONFIGURATION_PROPERTY}
*
* @author Karel Kovarik
* @see CircuitBreaker
* @since 2.2
*/
public class CircuitComponent extends DefaultComponent {

/**
* CircuitBreaker interface implementation.
* If none is provided, component cannot be used.
*/
@Autowired
private CircuitBreaker circuitBreaker;

/**
* Camel producer template.
*/
@Produce
private ProducerTemplate producerTemplate;

/**
* CamelContext.
*/
@Autowired
private CamelContext camelContext;


@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
final CircuitEndpoint endpoint = new CircuitEndpoint(uri, this);

final String endpointURI = StringHelper.after(uri, ":");
Assert.hasText(endpointURI, "the endpointURI must not be empty");

final String name = StringHelper.before(endpointURI, ":");
Assert.hasText(name, "the circuitName must not be empty.");
final String trimmedName = name.replaceAll("/", "").trim();
Assert.hasText(trimmedName, "the trimmed circuitName must not be empty.");
final String targetUri = StringHelper.after(endpointURI, ":");
Assert.hasText(targetUri, "the targetUri must not be empty.");

endpoint.setCircuitName(trimmedName);
endpoint.setTargetUri(targetUri);
return endpoint;
}

/**
* Get instance of CircuitBreaker implementation.
*/
protected CircuitBreaker getCircuitBreaker() {
return circuitBreaker;
}

/**
* Get instance of producerTemplate.
*/
protected ProducerTemplate getProducerTemplate() {
return producerTemplate;
}

@Override
protected void validateParameters(String uri, Map<String, Object> parameters, String optionPrefix) {
// do nothing, do not call validation from parent - DefaultComponent
}
}
@@ -0,0 +1,82 @@
package org.openhubframework.openhub.component.circuitbreaker;

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultEndpoint;
import org.openhubframework.openhub.spi.circuitbreaker.CircuitBreaker;


/**
* Circuit breaker endpoint.
*
* @author Karel Kovarik
* @since 2.2
*/
public class CircuitEndpoint extends DefaultEndpoint {

private String circuitName;
private String targetUri;

public CircuitEndpoint(String endpointUri, CircuitComponent component) {
super(endpointUri, component);
}

@Override
public Producer createProducer() throws Exception {
return new CircuitProducer(this);
}

@Override
public Consumer createConsumer(Processor processor) throws Exception {
throw new UnsupportedOperationException("Circuit breaker does not support consumer endpoint.");
}

@Override
public boolean isSingleton() {
return true;
}

/**
* Get circuit name.
*/
public String getCircuitName() {
return circuitName;
}

/**
* Set circuit name.
*/
public void setCircuitName(String circuitName) {
this.circuitName = circuitName;
}

/**
* Get target uri.
*/
public String getTargetUri() {
return targetUri;
}

/**
* Set target uri.
*/
public void setTargetUri(String targetUri) {
this.targetUri = targetUri;
}

/**
* Get circuit breaker instance.
*/
protected CircuitBreaker getCircuitBreaker() {
return ((CircuitComponent) getComponent()).getCircuitBreaker();
}

/**
* Get producer template instance.
*/
protected ProducerTemplate getProducerTemplate() {
return ((CircuitComponent) getComponent()).getProducerTemplate();
}
}
@@ -0,0 +1,62 @@
package org.openhubframework.openhub.component.circuitbreaker;

import static org.springframework.util.StringUtils.hasText;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.openhubframework.openhub.spi.circuitbreaker.CircuitBreaker;
import org.openhubframework.openhub.spi.circuitbreaker.CircuitConfiguration;
import org.springframework.util.Assert;

/**
* Camel Producer for Circuit breaker component.
*
* @author Karel Kovarik
* @see org.apache.camel.Producer
* @see CircuitComponent
* @since 2.2
*/
public class CircuitProducer extends DefaultProducer {

/**
* New instance of circuit producer.
*
* @param endpoint the related circuit endpoint.
*/
public CircuitProducer(CircuitEndpoint endpoint) {
super(endpoint);
}

@Override
public void process(Exchange exchange) throws Exception {
// verify configuration is present, otherwise fail as it is misconfigured
final CircuitConfiguration circuitConfiguration = exchange.getProperty(
CircuitBreaker.CONFIGURATION_PROPERTY, CircuitConfiguration.class);
Assert.notNull(circuitConfiguration, "the circuitConfiguration was not found,"
+ "it is expected to be set in exchange property with name ["
+ CircuitBreaker.CONFIGURATION_PROPERTY + "].");

// fill circuitName (if not set in configuration already)
if (!hasText(circuitConfiguration.getCircuitName())) {
circuitConfiguration.setCircuitName(getCircuitEndpoint().getCircuitName());
}

final CircuitEndpoint endpoint = getCircuitEndpoint();
try {
// check circuit state, will throw exception if circuit is down
endpoint.getCircuitBreaker().checkCircuitIsOpen().process(exchange);
// invoke targetUri
exchange = endpoint.getProducerTemplate().send(endpoint.getTargetUri(), exchange);
} finally {
// update circuitState
endpoint.getCircuitBreaker().updateCircuitState().process(exchange);
}
}

/**
* Get related circuit endpoint.
*/
public CircuitEndpoint getCircuitEndpoint() {
return (CircuitEndpoint) getEndpoint();
}
}
@@ -0,0 +1,4 @@
/**
* "circuit" camel component.
*/
package org.openhubframework.openhub.component.circuitbreaker;
@@ -0,0 +1 @@
class=org.openhubframework.openhub.component.circuitbreaker.CircuitComponent
@@ -0,0 +1,101 @@
package org.openhubframework.openhub.component.circuitbreaker;

import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Before;
import org.junit.Test;
import org.openhubframework.openhub.api.route.AbstractBasicRoute;
import org.openhubframework.openhub.component.AbstractComponentsTest;
import org.openhubframework.openhub.spi.circuitbreaker.CircuitBreaker;
import org.openhubframework.openhub.spi.circuitbreaker.CircuitConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.test.context.TestPropertySource;

/**
* Simple test suite for {@link CircuitComponent}.
*
* @author Karel Kovarik
* @since 2.2
*/
@TestPropertySource(properties = {
"ohf.circuitbreaker.enabled=true",
"ohf.circuitbreaker.impl=org.openhubframework.openhub.core.circuitbreaker.CircuitBreakerInMemoryImpl",
"ohf.foo.circuitbreaker.enabled=true",
"ohf.foo.circuitbreaker.thresholdPercentage=90",
"ohf.foo.circuitbreaker.windowSizeInMillis=10000",
"ohf.foo.circuitbreaker.minimalCountInWindow=10",
"ohf.foo.circuitbreaker.sleepInMillis=30000",
})
@EnableConfigurationProperties(CircuitComponentTest.FooCircuitConfigurationProperties.class)
public class CircuitComponentTest extends AbstractComponentsTest {

@Produce(uri = "direct:start")
private ProducerTemplate producer;

@EndpointInject(uri = "mock:test")
private MockEndpoint mock;

@Autowired
private FooCircuitConfigurationProperties fooCircuitConfigProps;

@Before
public void prepareRoutes() throws Exception {
final RouteBuilder testedRoute = new AbstractBasicRoute() {
@Override
public void doConfigure() throws Exception {
from("direct:start")
// configure circuit breaker component
.setProperty(CircuitBreaker.CONFIGURATION_PROPERTY, constant(fooCircuitConfigProps))
// circuit:<circuit-name>:<uri>
.to("circuit:foo:mock:test");
}
};
// add to camel context
getCamelContext().addRoutes(testedRoute);
}

@Test
public void test_passThrough() throws Exception {

mock.expectedMessageCount(2);
producer.sendBody("payload-1");
producer.sendBody("payload-2");
mock.assertIsSatisfied();
}

@Test
public void test_circuitDown() throws Exception {
fooCircuitConfigProps.setMinimalCountInWindow(1L);
fooCircuitConfigProps.setThresholdPercentage(1);
fooCircuitConfigProps.setSleepInMillis(60_000);
fooCircuitConfigProps.setWindowSizeInMillis(30_000);

mock.whenExchangeReceived(1, exchange -> {
throw new RuntimeException("Something went wrong.");
});

mock.expectedMessageCount(1);
sendBodyInTryCatch("payload-1");
sendBodyInTryCatch("payload-2");
mock.assertIsSatisfied();
}

// util methods
protected void sendBodyInTryCatch(Object body) {
try {
producer.sendBody(body);
} catch (Exception ex) {
// do nothing
}
}

@ConfigurationProperties(prefix = "ohf.foo.circuitbreaker")
public static class FooCircuitConfigurationProperties extends CircuitConfiguration {
// nothing in here, already inherited from CircuitConfiguration
}
}

0 comments on commit b654afb

Please sign in to comment.