Skip to content

Commit

Permalink
CAMEL-9940: ProducerTemplate - Make extract result set part of UoW
Browse files Browse the repository at this point in the history
  • Loading branch information
davsclaus committed May 4, 2016
1 parent b4a6ad4 commit 1cca6b7
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 16 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.apache.camel.NoSuchEndpointException; import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor; import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate; import org.apache.camel.ProducerTemplate;
import org.apache.camel.processor.ConvertBodyProcessor;
import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ServiceSupport; import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.CamelContextHelper;
Expand Down Expand Up @@ -324,37 +325,44 @@ public Object requestBodyAndHeaders(final Object body, final Map<String, Object>
} }


public <T> T requestBody(Object body, Class<T> type) { public <T> T requestBody(Object body, Class<T> type) {
Object answer = requestBody(body); Exchange exchange = producerCache.send(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, createSetBodyProcessor(body), createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer); return camelContext.getTypeConverter().convertTo(type, answer);
} }


public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) { public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) {
Object answer = requestBody(endpoint, body); Exchange exchange = producerCache.send(endpoint, ExchangePattern.InOut, createSetBodyProcessor(body), createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer); return camelContext.getTypeConverter().convertTo(type, answer);
} }


public <T> T requestBody(String endpointUri, Object body, Class<T> type) { public <T> T requestBody(String endpointUri, Object body, Class<T> type) {
Object answer = requestBody(endpointUri, body); Exchange exchange = producerCache.send(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, createSetBodyProcessor(body), createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer); return camelContext.getTypeConverter().convertTo(type, answer);
} }


public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) { public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) {
Object answer = requestBodyAndHeader(endpoint, body, header, headerValue); Exchange exchange = producerCache.send(endpoint, ExchangePattern.InOut, createBodyAndHeaderProcessor(body, header, headerValue), createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer); return camelContext.getTypeConverter().convertTo(type, answer);
} }


public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) { public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) {
Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue); Exchange exchange = producerCache.send(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, createBodyAndHeaderProcessor(body, header, headerValue), createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer); return camelContext.getTypeConverter().convertTo(type, answer);
} }


public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) { public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) {
Object answer = requestBodyAndHeaders(endpointUri, body, headers); Exchange exchange = producerCache.send(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, createBodyAndHeaders(body, headers), createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer); return camelContext.getTypeConverter().convertTo(type, answer);
} }


public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) { public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) {
Object answer = requestBodyAndHeaders(endpoint, body, headers); Exchange exchange = producerCache.send(endpoint, ExchangePattern.InOut, createBodyAndHeaders(body, headers), createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer); return camelContext.getTypeConverter().convertTo(type, answer);
} }


Expand Down Expand Up @@ -436,6 +444,20 @@ public void process(Exchange exchange) {
}; };
} }


protected Processor createBodyAndHeaders(final Object body, final Map<String, Object> headers) {
return new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
if (headers != null) {
for (Map.Entry<String, Object> header : headers.entrySet()) {
in.setHeader(header.getKey(), header.getValue());
}
}
in.setBody(body);
}
};
}

protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) { protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) {
return new Processor() { return new Processor() {
public void process(Exchange exchange) { public void process(Exchange exchange) {
Expand All @@ -455,6 +477,10 @@ public void process(Exchange exchange) {
}; };
} }


protected Processor createConvertBodyProcessor(final Class<?> type) {
return new ConvertBodyProcessor(type);
}

protected Endpoint resolveMandatoryEndpoint(String endpointUri) { protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
Endpoint endpoint = camelContext.getEndpoint(endpointUri); Endpoint endpoint = camelContext.getEndpoint(endpointUri);
if (endpoint == null) { if (endpoint == null) {
Expand Down
52 changes: 43 additions & 9 deletions camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
Expand Up @@ -16,6 +16,8 @@
*/ */
package org.apache.camel.impl; package org.apache.camel.impl;


import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;


import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncCallback;
Expand All @@ -26,12 +28,12 @@
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePattern;
import org.apache.camel.FailedToCreateProducerException; import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor; import org.apache.camel.Processor;
import org.apache.camel.Producer; import org.apache.camel.Producer;
import org.apache.camel.ProducerCallback; import org.apache.camel.ProducerCallback;
import org.apache.camel.ServicePoolAware; import org.apache.camel.ServicePoolAware;
import org.apache.camel.processor.UnitOfWorkProducer; import org.apache.camel.processor.CamelInternalProcessor;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.ServicePool; import org.apache.camel.spi.ServicePool;
import org.apache.camel.support.ServiceSupport; import org.apache.camel.support.ServiceSupport;
Expand Down Expand Up @@ -203,7 +205,7 @@ public void startProducer(Endpoint endpoint) throws Exception {
* @param exchange the exchange to send * @param exchange the exchange to send
*/ */
public void send(Endpoint endpoint, Exchange exchange) { public void send(Endpoint endpoint, Exchange exchange) {
sendExchange(endpoint, null, null, exchange); sendExchange(endpoint, null, null, null, exchange);
} }


/** /**
Expand All @@ -219,7 +221,7 @@ public void send(Endpoint endpoint, Exchange exchange) {
* @return the exchange * @return the exchange
*/ */
public Exchange send(Endpoint endpoint, Processor processor) { public Exchange send(Endpoint endpoint, Processor processor) {
return sendExchange(endpoint, null, processor, null); return sendExchange(endpoint, null, processor, null, null);
} }


/** /**
Expand All @@ -236,7 +238,25 @@ public Exchange send(Endpoint endpoint, Processor processor) {
* @return the exchange * @return the exchange
*/ */
public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
return sendExchange(endpoint, pattern, processor, null); return sendExchange(endpoint, pattern, processor, null, null);
}

/**
* Sends an exchange to an endpoint using a supplied
* {@link Processor} to populate the exchange
* <p>
* This method will <b>not</b> throw an exception. If processing of the given
* Exchange failed then the exception is stored on the return Exchange
*
* @param endpoint the endpoint to send the exchange to
* @param pattern the message {@link ExchangePattern} such as
* {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
* @param processor the transformer used to populate the new exchange
* @param resultProcessor a processor to process the exchange when the send is complete.
* @return the exchange
*/
public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor) {
return sendExchange(endpoint, pattern, processor, resultProcessor, null);
} }


/** /**
Expand Down Expand Up @@ -377,7 +397,7 @@ public void done(boolean doneSync) {
} }


protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
final Processor processor, Exchange exchange) { final Processor processor, final Processor resultProcessor, Exchange exchange) {
return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() { return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() {
public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) { public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) {
if (exchange == null) { if (exchange == null) {
Expand Down Expand Up @@ -408,9 +428,23 @@ public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePatte
watch = new StopWatch(); watch = new StopWatch();
EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
} }
// ensure we run in an unit of work
Producer target = new UnitOfWorkProducer(producer); // if we have a result processor then wrap in pipeline to execute both of them in sequence
target.process(exchange); Processor target;
if (resultProcessor != null) {
List<Processor> processors = new ArrayList<Processor>(2);
processors.add(producer);
processors.add(resultProcessor);
target = Pipeline.newInstance(getCamelContext(), processors);
} else {
target = producer;
}

// wrap in unit of work
CamelInternalProcessor internal = new CamelInternalProcessor(target);
internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));

internal.process(exchange);
} catch (Throwable e) { } catch (Throwable e) {
// ensure exceptions is caught and set on the exchange // ensure exceptions is caught and set on the exchange
exchange.setException(e); exchange.setException(e);
Expand Down

0 comments on commit 1cca6b7

Please sign in to comment.