Skip to content

Commit

Permalink
Add contract based type awareness and transformer which leverages the…
Browse files Browse the repository at this point in the history
… type metadata
  • Loading branch information
igarashitm committed Sep 16, 2016
1 parent 8dc8d59 commit 498c27d
Show file tree
Hide file tree
Showing 37 changed files with 2,458 additions and 10 deletions.
34 changes: 34 additions & 0 deletions camel-core/src/main/java/org/apache/camel/CamelContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import org.apache.camel.model.remote.ServiceCallConfigurationDefinition;
import org.apache.camel.model.rest.RestDefinition;
import org.apache.camel.model.rest.RestsDefinition;
import org.apache.camel.model.transformer.TransformerDefinition;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.CamelContextNameStrategy;
import org.apache.camel.spi.ClassResolver;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.DataFormatResolver;
import org.apache.camel.spi.DataType;
import org.apache.camel.spi.Debugger;
import org.apache.camel.spi.EndpointRegistry;
import org.apache.camel.spi.EndpointStrategy;
Expand Down Expand Up @@ -69,6 +71,7 @@
import org.apache.camel.spi.ServicePool;
import org.apache.camel.spi.ShutdownStrategy;
import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.spi.Transformer;
import org.apache.camel.spi.TypeConverterRegistry;
import org.apache.camel.spi.UnitOfWorkFactory;
import org.apache.camel.spi.UuidGenerator;
Expand Down Expand Up @@ -1216,6 +1219,37 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
*/
void setDataFormatResolver(DataFormatResolver dataFormatResolver);

/**
* Sets the transformers that can be referenced in the routes.
*
* @param transformers the transformers
*/
void setTransformers(List<TransformerDefinition> transformers);

/**
* Gets the transformers that can be referenced in the routes.
*
* @return the transformers available
*/
List<TransformerDefinition> getTransformers();

/**
* Resolve a transformer given a scheme
*
* @param model data model name.
* @return the resolved transformer, or <tt>null</tt> if not found
*/
Transformer resolveTransformer(String model);

/**
* Resolve a transformer given from/to data type.
*
* @param from from data type
* @param to to data type
* @return the resolved data format, or <tt>null</tt> if not found
*/
Transformer resolveTransformer(DataType from, DataType to);

/**
* Sets the properties that can be referenced in the camel context
*
Expand Down
3 changes: 3 additions & 0 deletions camel-core/src/main/java/org/apache/camel/Exchange.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ public interface Exchange {
String XSLT_FATAL_ERROR = "CamelXsltFatalError";
String XSLT_WARNING = "CamelXsltWarning";

String INPUT_TYPE = "CamelInputType";
String OUTPUT_TYPE = "CamelOutputType";

/**
* Returns the {@link ExchangePattern} (MEP) of this exchange.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.camel.impl.converter.BaseTypeConverterRegistry;
import org.apache.camel.impl.converter.DefaultTypeConverter;
import org.apache.camel.impl.converter.LazyLoadingTypeConverter;
import org.apache.camel.impl.transformer.TransformerKey;
import org.apache.camel.management.DefaultManagementMBeanAssembler;
import org.apache.camel.management.DefaultManagementStrategy;
import org.apache.camel.management.JmxSystemPropertyKeys;
Expand All @@ -105,6 +106,7 @@
import org.apache.camel.model.remote.ServiceCallConfigurationDefinition;
import org.apache.camel.model.rest.RestDefinition;
import org.apache.camel.model.rest.RestsDefinition;
import org.apache.camel.model.transformer.TransformerDefinition;
import org.apache.camel.processor.interceptor.BacklogDebugger;
import org.apache.camel.processor.interceptor.BacklogTracer;
import org.apache.camel.processor.interceptor.Debug;
Expand All @@ -119,6 +121,7 @@
import org.apache.camel.spi.Container;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.DataFormatResolver;
import org.apache.camel.spi.DataType;
import org.apache.camel.spi.Debugger;
import org.apache.camel.spi.EndpointRegistry;
import org.apache.camel.spi.EndpointStrategy;
Expand Down Expand Up @@ -150,6 +153,7 @@
import org.apache.camel.spi.ServicePool;
import org.apache.camel.spi.ShutdownStrategy;
import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.spi.Transformer;
import org.apache.camel.spi.TypeConverterRegistry;
import org.apache.camel.spi.UnitOfWorkFactory;
import org.apache.camel.spi.UuidGenerator;
Expand Down Expand Up @@ -273,6 +277,8 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
private final StopWatch stopWatch = new StopWatch(false);
private Date startDate;
private ModelJAXBContextFactory modelJAXBContextFactory;
private List<TransformerDefinition> transformers = new ArrayList<TransformerDefinition>();
private Map<TransformerKey, Transformer> transformerRegistry = new HashMap<TransformerKey, Transformer>();

/**
* Creates the {@link CamelContext} using {@link JndiRegistry} as registry,
Expand Down Expand Up @@ -4264,4 +4270,57 @@ protected ModelJAXBContextFactory createModelJAXBContextFactory() {
public String toString() {
return "CamelContext(" + getName() + ")";
}

@Override
public void setTransformers(List<TransformerDefinition> transformers) {
this.transformers = transformers;
}

@Override
public List<TransformerDefinition> getTransformers() {
return transformers;
}

@Override
public Transformer resolveTransformer(String scheme) {
if (scheme == null) {
return null;
}
return resolveTransformer(getTransformerKey(scheme));
}

@Override
public Transformer resolveTransformer(DataType from, DataType to) {
if (from == null || to == null) {
return null;
}
return resolveTransformer(getTransformerKey(from,to));
}

protected Transformer resolveTransformer(TransformerKey key) {
Transformer transformer = transformerRegistry.get(key);
if (transformer != null) {
return transformer;
}
for (TransformerDefinition def : getTransformers()) {
if (key.match(def)) {
try {
transformer = def.createTransformer(this);
} catch (Exception e) {
throw new RuntimeCamelException(e);
}
transformerRegistry.put(key, transformer);

This comment has been minimized.

Copy link
@apupier

apupier Sep 19, 2016

In case of Exception, you put the transformer (which is null) in the map even if it has not been initialized?

return transformer;

This comment has been minimized.

Copy link
@apupier

apupier Sep 19, 2016

in case of Exception ,we return null, might be better to continue the loop?

}
}
return null;
}

protected TransformerKey getTransformerKey(String scheme) {
return new TransformerKey(scheme);
}

protected TransformerKey getTransformerKey(DataType from, DataType to) {
return new TransformerKey(from, to);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.apache.camel.ExchangePattern;
import org.apache.camel.PollingConsumer;
import org.apache.camel.ResolveEndpointFailedException;
import org.apache.camel.model.ContractAwareDefinition;
import org.apache.camel.spi.Contract;
import org.apache.camel.spi.ContractAware;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.HasId;
import org.apache.camel.spi.UriParam;
Expand All @@ -53,7 +56,7 @@
*
* @version
*/
public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware {
public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware, ContractAware<DefaultEndpoint> {

private static final Logger LOG = LoggerFactory.getLogger(DefaultEndpoint.class);
private final String id = EndpointHelper.createEndpointId();
Expand Down Expand Up @@ -87,6 +90,7 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
private int pollingConsumerQueueSize = 1000;
private boolean pollingConsumerBlockWhenFull = true;
private long pollingConsumerBlockTimeout;
private Contract contract;

/**
* Constructs a fully-initialized DefaultEndpoint instance. This is the
Expand Down Expand Up @@ -525,6 +529,17 @@ protected void configurePollingConsumer(PollingConsumer consumer) throws Excepti
configureConsumer(consumer);
}

@Override
public DefaultEndpoint setContract(Contract contract) {
this.contract = contract;
return this;
}

@Override
public Contract getContract() {
return this.contract;
}

@Override
protected void doStart() throws Exception {
// the bridgeErrorHandler/exceptionHandler was originally configured with consumer. prefix, such as consumer.bridgeErrorHandler=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.processor.CamelInternalProcessor;
import org.apache.camel.processor.ContractProcessor;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.spi.Contract;
import org.apache.camel.spi.ContractAware;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.RoutePolicy;
Expand Down Expand Up @@ -160,6 +163,14 @@ public void commit() {
// force creating the route id so its known ahead of the route is started
String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory());

// wrap with ContractProcessor if the contract is declared on the endpoint
if (endpoint instanceof ContractAware) {
Contract contract = ((ContractAware)endpoint).getContract();
if (contract != null) {
target = new ContractProcessor(target, contract);
}
}

// and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
CamelInternalProcessor internal = new CamelInternalProcessor(target);
internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(this));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.impl.transformer;

import java.io.InputStream;
import java.io.OutputStream;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.converter.stream.OutputStreamBuilder;
import org.apache.camel.model.DataFormatDefinition;
import org.apache.camel.model.transformer.DataFormatTransformerDefinition;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.DataType;
import org.apache.camel.spi.Transformer;

/**
* A <a href="http://camel.apache.org/transformer.html">Transformer</a>
* leverages DataFormat to perform transformation.
*/
public class DataFormatTransformer extends Transformer {

private String dataFormatRef;

/**
* Perform data transformation with specified from/to type using DataFormat.
* @param message message to apply transformation
* @param from 'from' data type
* @param to 'to' data type
*/
@Override
public void transform(Message message, DataType from, DataType to) throws Exception {
Exchange exchange = message.getExchange();
CamelContext context = exchange.getContext();
DataFormat dataFormat = DataFormatDefinition.getDataFormat(
exchange.getUnitOfWork().getRouteContext(), null, dataFormatRef);

// Unmarshalling into Java Object
if (to.isJavaType() && (from.equals(getFrom()) || from.getModel().equals(getModel()))) {
Object answer = dataFormat.unmarshal(exchange, message.getBody(InputStream.class));
Class<?> toClass = context.getClassResolver().resolveClass(to.getName());
if (!toClass.isAssignableFrom(answer.getClass())) {
answer = context.getTypeConverter().mandatoryConvertTo(toClass, answer);
}
message.setBody(answer);

// Marshalling from Java Object
} else if (from.isJavaType() && (to.equals(getTo()) || to.getModel().equals(getModel()))) {
Object input = message.getBody();
Class<?> fromClass = context.getClassResolver().resolveClass(from.getName());
if (!fromClass.isAssignableFrom(input.getClass())) {
input = context.getTypeConverter().mandatoryConvertTo(fromClass, input);
}
OutputStreamBuilder osb = OutputStreamBuilder.withExchange(exchange);
dataFormat.marshal(exchange, message.getBody(), osb);
message.setBody(osb.build());

} else {
throw new IllegalArgumentException("Unsupported transformation: from='" + from + ", to='" + to + "'");
}
}

/**
* Set DataFormat ref.
* @param ref DataFormat ref
* @return this DataFormatTransformer instance
*/
public DataFormatTransformer setDataFormatRef(String ref) {
this.dataFormatRef = ref;
return this;
}
}

0 comments on commit 498c27d

Please sign in to comment.