Skip to content

Commit

Permalink
CAMEL-8965: WireTap supports dynamic uris like toD does
Browse files Browse the repository at this point in the history
  • Loading branch information
davsclaus committed Jul 20, 2015
1 parent 4376cb3 commit cc55671
Show file tree
Hide file tree
Showing 23 changed files with 135 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

public interface ManagedWireTapMBean extends ManagedProcessorMBean {

@ManagedAttribute(description = "Expression that returns the uri to use for the wire tap destination", mask = true)
String getExpression();
@ManagedAttribute(description = "The uri of the endpoint to wiretap to. The uri can be dynamic computed using the expressions.", mask = true)
String getUri();

@ManagedAttribute(description = "Sets the maximum size used by the ProducerCache which is used to cache and reuse producers")
Integer getCacheSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ public void init(ManagementStrategy strategy) {
super.init(strategy);
boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false;
if (sanitize) {
uri = URISupport.sanitizeUri(processor.getExpression().toString());
uri = URISupport.sanitizeUri(processor.getUri());
} else {
uri = processor.getExpression().toString();
uri = processor.getUri();
}
}

public WireTapProcessor getProcessor() {
return processor;
}

public String getExpression() {
public String getUri() {
return uri;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2256,44 +2256,14 @@ public Type rollback(String message) {
* destination gets a copy of the original message to avoid the processors
* interfering with each other using {@link ExchangePattern#InOnly}.
*
* @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @return the builder
*/
public ExpressionClause<WireTapDefinition> wireTap() {
public WireTapDefinition<Type> wireTap(String uri) {
WireTapDefinition answer = new WireTapDefinition();
answer.setUri(uri);
addOutput(answer);
return ExpressionClause.createAndSetExpression(answer);
}

/**
* <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
* Sends messages to all its child outputs; so that each processor and
* destination gets a copy of the original message to avoid the processors
* interfering with each other using {@link ExchangePattern#InOnly}.
*
* @param expression the expression to compute the uri to use as wire tap
* @return the builder
*/
public Type wireTap(Expression expression) {
WireTapDefinition answer = new WireTapDefinition();
answer.setExpression(new ExpressionDefinition(expression));
addOutput(answer);
return (Type) this;
}

/**
* <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
* Sends messages to all its child outputs; so that each processor and
* destination gets a copy of the original message to avoid the processors
* interfering with each other using {@link ExchangePattern#InOnly}.
*
* @param uri the destination
* @return the builder
*/
public Type wireTap(String uri) {
WireTapDefinition answer = new WireTapDefinition();
answer.setExpression(new SimpleExpression(uri));
addOutput(answer);
return (Type) this;
return answer;
}

/**
Expand All @@ -2302,17 +2272,19 @@ public Type wireTap(String uri) {
* destination gets a copy of the original message to avoid the processors
* interfering with each other using {@link ExchangePattern#InOnly}.
*
* @param uri the destination
* @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param executorService a custom {@link ExecutorService} to use as thread pool
* for sending tapped exchanges
* @return the builder
* @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
public Type wireTap(String uri, ExecutorService executorService) {
@Deprecated
public WireTapDefinition<Type> wireTap(String uri, ExecutorService executorService) {
WireTapDefinition answer = new WireTapDefinition();
answer.setExpression(new SimpleExpression(uri));
answer.setUri(uri);
answer.setExecutorService(executorService);
addOutput(answer);
return (Type) this;
return answer;
}

/**
Expand All @@ -2321,17 +2293,19 @@ public Type wireTap(String uri, ExecutorService executorService) {
* destination gets a copy of the original message to avoid the processors
* interfering with each other using {@link ExchangePattern#InOnly}.
*
* @param uri the destination
* @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param executorServiceRef reference to lookup a custom {@link ExecutorService}
* to use as thread pool for sending tapped exchanges
* @return the builder
* @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
public Type wireTap(String uri, String executorServiceRef) {
@Deprecated
public WireTapDefinition<Type> wireTap(String uri, String executorServiceRef) {
WireTapDefinition answer = new WireTapDefinition();
answer.setExpression(new SimpleExpression(uri));
answer.setUri(uri);
answer.setExecutorServiceRef(executorServiceRef);
addOutput(answer);
return (Type) this;
return answer;
}

/**
Expand All @@ -2342,11 +2316,13 @@ public Type wireTap(String uri, String executorServiceRef) {
* Will use a copy of the original Exchange which is passed in as argument
* to the given expression
*
* @param uri the destination
* @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param body expression that creates the body to send
* @return the builder
* @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
public Type wireTap(String uri, Expression body) {
@Deprecated
public WireTapDefinition<Type> wireTap(String uri, Expression body) {
return wireTap(uri, true, body);
}

Expand All @@ -2355,35 +2331,39 @@ public Type wireTap(String uri, Expression body) {
* Sends a new {@link org.apache.camel.Exchange} to the destination
* using {@link ExchangePattern#InOnly}.
*
* @param uri the destination
* @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param copy whether or not use a copy of the original exchange or a new empty exchange
* @return the builder
* @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
public Type wireTap(String uri, boolean copy) {
@Deprecated
public WireTapDefinition<Type> wireTap(String uri, boolean copy) {
WireTapDefinition answer = new WireTapDefinition();
answer.setExpression(new SimpleExpression(uri));
answer.setUri(uri);
answer.setCopy(copy);
addOutput(answer);
return (Type) this;
return answer;
}

/**
* <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
* Sends a new {@link org.apache.camel.Exchange} to the destination
* using {@link ExchangePattern#InOnly}.
*
* @param uri the destination
* @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param copy whether or not use a copy of the original exchange or a new empty exchange
* @param body expression that creates the body to send
* @return the builder
* @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
public Type wireTap(String uri, boolean copy, Expression body) {
@Deprecated
public WireTapDefinition<Type> wireTap(String uri, boolean copy, Expression body) {
WireTapDefinition answer = new WireTapDefinition();
answer.setExpression(new SimpleExpression(uri));
answer.setUri(uri);
answer.setCopy(copy);
answer.setNewExchangeExpression(body);
answer.setNewExchangeExpression(new ExpressionSubElementDefinition(body));
addOutput(answer);
return (Type) this;
return answer;
}

/**
Expand All @@ -2394,11 +2374,13 @@ public Type wireTap(String uri, boolean copy, Expression body) {
* Will use a copy of the original Exchange which is passed in as argument
* to the given processor
*
* @param uri the destination
* @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param processor processor preparing the new exchange to send
* @return the builder
* @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
public Type wireTap(String uri, Processor processor) {
@Deprecated
public WireTapDefinition<Type> wireTap(String uri, Processor processor) {
return wireTap(uri, true, processor);
}

Expand All @@ -2407,18 +2389,20 @@ public Type wireTap(String uri, Processor processor) {
* Sends a new {@link org.apache.camel.Exchange} to the destination
* using {@link ExchangePattern#InOnly}.
*
* @param uri the destination
* @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param copy whether or not use a copy of the original exchange or a new empty exchange
* @param processor processor preparing the new exchange to send
* @return the builder
* @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
public Type wireTap(String uri, boolean copy, Processor processor) {
@Deprecated
public WireTapDefinition<Type> wireTap(String uri, boolean copy, Processor processor) {
WireTapDefinition answer = new WireTapDefinition();
answer.setExpression(new SimpleExpression(uri));
answer.setUri(uri);
answer.setCopy(copy);
answer.setNewExchangeProcessor(processor);
addOutput(answer);
return (Type) this;
return answer;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ public ToDynamicDefinition(String uri) {
public Processor createProcessor(RouteContext routeContext) throws Exception {
ObjectHelper.notEmpty(uri, "uri", this);

Expression exp = createExpression(routeContext);

SendDynamicProcessor processor = new SendDynamicProcessor(uri, exp);
processor.setPattern(pattern);
if (cacheSize != null) {
processor.setCacheSize(cacheSize);
}
if (ignoreInvalidEndpoint != null) {
processor.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
}
return processor;
}

protected Expression createExpression(RouteContext routeContext) {
List<Expression> list = new ArrayList<Expression>();
String[] parts = uri.split("\\+");
for (String part : parts) {
Expand Down Expand Up @@ -97,15 +111,7 @@ public Processor createProcessor(RouteContext routeContext) throws Exception {
exp = ExpressionBuilder.concatExpression(list);
}

SendDynamicProcessor processor = new SendDynamicProcessor(uri, exp);
processor.setPattern(pattern);
if (cacheSize != null) {
processor.setCacheSize(cacheSize);
}
if (ignoreInvalidEndpoint != null) {
processor.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
}
return processor;
return exp;
}

@Override
Expand Down
Loading

0 comments on commit cc55671

Please sign in to comment.