Permalink
Browse files

CAMEL-6327: More work on new camel-netty-http component.

git-svn-id: https://svn.apache.org/repos/asf/camel/trunk@1478078 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent ac6a8c1 commit b0ccd947445df66a980034bb87f3fdbdb8066d02 @davsclaus davsclaus committed May 1, 2013
@@ -58,7 +58,7 @@ public DefaultNettyHttpBinding(HeaderFilterStrategy headerFilterStrategy) {
public Message toCamelMessage(HttpRequest request, Exchange exchange) throws Exception {
LOG.trace("toCamelMessage: {}", request);
- NettyHttpMessage answer = new NettyHttpMessage(request, this);
+ NettyHttpMessage answer = new NettyHttpMessage(request, null, this);
// force getting headers which will populate them
answer.getHeaders();
@@ -98,6 +98,46 @@ public void populateCamelHeaders(HttpRequest request, Map<String, Object> header
}
}
+ @Override
+ public Message toCamelMessage(HttpResponse response, Exchange exchange) throws Exception {
+ LOG.trace("toCamelMessage: {}", response);
+
+ NettyHttpMessage answer = new NettyHttpMessage(null, response, this);
+ // force getting headers which will populate them
+ answer.getHeaders();
+
+ // keep the body as is, and use type converters
+ answer.setBody(response.getContent());
+ return answer;
+ }
+
+ @Override
+ public void populateCamelHeaders(HttpResponse response, Map<String, Object> headers, Exchange exchange) throws Exception {
+ LOG.trace("populateCamelHeaders: {}", response);
+
+ headers.put(Exchange.HTTP_RESPONSE_CODE, response.getStatus().getCode());
+ // TODO: use another status header
+ headers.put("CamelHttpResponseText", response.getStatus().getReasonPhrase());
+
+ for (String name : response.getHeaderNames()) {
+ // mapping the content-type
+ if (name.toLowerCase().equals("content-type")) {
+ name = Exchange.CONTENT_TYPE;
+ }
+ // add the headers one by one, and use the header filter strategy
+ List<String> values = response.getHeaders(name);
+ Iterator<?> it = ObjectHelper.createIterator(values);
+ while (it.hasNext()) {
+ Object extracted = it.next();
+ LOG.trace("HTTP-header: {}", extracted);
+ if (headerFilterStrategy != null
+ && !headerFilterStrategy.applyFilterToExternalHeaders(name, extracted, exchange)) {
+ NettyHttpHelper.appendHeader(headers, name, extracted);
+ }
+ }
+ }
+ }
+
@Override
public HttpResponse toNettyResponse(Message message) throws Exception {
LOG.trace("toNettyResponse: {}", message);
@@ -37,27 +37,29 @@
public class HttpClientPipelineFactory extends ClientPipelineFactory {
private static final Logger LOG = LoggerFactory.getLogger(HttpClientPipelineFactory.class);
- private NettyProducer producer;
+ private NettyHttpProducer producer;
private SSLContext sslContext;
public HttpClientPipelineFactory() {
// default constructor needed
}
- public HttpClientPipelineFactory(NettyProducer nettyProducer) {
+ public HttpClientPipelineFactory(NettyHttpProducer nettyProducer) {
this.producer = nettyProducer;
try {
this.sslContext = createSSLContext(producer);
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
- LOG.info("Created SslContext {}", sslContext);
+ if (sslContext != null) {
+ LOG.info("Created SslContext {}", sslContext);
+ }
}
@Override
public ClientPipelineFactory createPipelineFactory(NettyProducer nettyProducer) {
- return new HttpClientPipelineFactory(nettyProducer);
+ return new HttpClientPipelineFactory((NettyHttpProducer) nettyProducer);
}
@Override
@@ -55,7 +55,9 @@ public HttpServerPipelineFactory(NettyHttpConsumer nettyConsumer) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
- LOG.info("Created SslContext {}", sslContext);
+ if (sslContext != null) {
+ LOG.info("Created SslContext {}", sslContext);
+ }
}
@Override
@@ -50,7 +50,27 @@
void populateCamelHeaders(HttpRequest request, Map<String, Object> headers, Exchange exchange) throws Exception;
/**
- * Binds from Camel {@link Message} to Netty {@link org.jboss.netty.handler.codec.http.HttpResponse}.
+ * Binds from Netty {@link HttpResponse} to Camel {@Message}.
+ *
+ * @param response the netty http response
+ * @param exchange the exchange that should contain the returned message.
+ * @return the message to store on the given exchange
+ * @throws Exception is thrown if error during binding
+ */
+ Message toCamelMessage(HttpResponse response, Exchange exchange) throws Exception;
+
+ /**
+ * Binds from Netty {@link HttpResponse} to Camel headers as a {@link Map}.
+ *
+ * @param response the netty http response
+ * @param headers the Camel headers that should be populated
+ * @param exchange the exchange that should contain the returned message.
+ * @throws Exception is thrown if error during binding
+ */
+ void populateCamelHeaders(HttpResponse response, Map<String, Object> headers, Exchange exchange) throws Exception;
+
+ /**
+ * Binds from Camel {@link Message} to Netty {@link HttpResponse}.
*
* @param message the Camel message
* @return the http response
@@ -42,7 +42,12 @@ public static Object convertToHttpRequest(Class<?> type, Exchange exchange, Obje
// okay we may need to cheat a bit when we want to grab the HttpRequest as its stored on the NettyHttpMessage
// so if the message instance is a NettyHttpMessage and its body is the value, then we can grab the
// HttpRequest from the NettyHttpMessage
- NettyHttpMessage msg = exchange.getIn(NettyHttpMessage.class);
+ NettyHttpMessage msg;
+ if (exchange.hasOut()) {
+ msg = exchange.getOut(NettyHttpMessage.class);
+ } else {
+ msg = exchange.getIn(NettyHttpMessage.class);
+ }
if (msg != null && msg.getBody() == value) {
return msg.getHttpRequest();
}
@@ -51,6 +56,31 @@ public static Object convertToHttpRequest(Class<?> type, Exchange exchange, Obje
return null;
}
+ /**
+ * A fallback converter that allows us to easily call Java beans and use the raw Netty {@link HttpRequest} as parameter types.
+ */
+ @FallbackConverter
+ public static Object convertToHttpResponse(Class<?> type, Exchange exchange, Object value, TypeConverterRegistry registry) {
+ // if we want to covert to convertToHttpResponse
+ if (value != null && HttpResponse.class.isAssignableFrom(type)) {
+
+ // okay we may need to cheat a bit when we want to grab the HttpRequest as its stored on the NettyHttpMessage
+ // so if the message instance is a NettyHttpMessage and its body is the value, then we can grab the
+ // HttpRequest from the NettyHttpMessage
+ NettyHttpMessage msg;
+ if (exchange.hasOut()) {
+ msg = exchange.getOut(NettyHttpMessage.class);
+ } else {
+ msg = exchange.getIn(NettyHttpMessage.class);
+ }
+ if (msg != null && msg.getBody() == value) {
+ return msg.getHttpResponse();
+ }
+ }
+
+ return null;
+ }
+
@Converter
public static String toString(HttpResponse response, Exchange exchange) {
String contentType = response.getHeader(Exchange.CONTENT_TYPE);
@@ -21,6 +21,7 @@
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultMessage;
import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
/**
* Netty HTTP based {@link org.apache.camel.Message}.
@@ -31,28 +32,38 @@
public class NettyHttpMessage extends DefaultMessage {
private final transient HttpRequest httpRequest;
+ private final transient HttpResponse httpResponse;
private final transient NettyHttpBinding httpBinding;
- public NettyHttpMessage(HttpRequest httpRequest, NettyHttpBinding httpBinding) {
+ public NettyHttpMessage(HttpRequest httpRequest, HttpResponse httpResponse, NettyHttpBinding httpBinding) {
this.httpRequest = httpRequest;
+ this.httpResponse = httpResponse;
this.httpBinding = httpBinding;
}
public HttpRequest getHttpRequest() {
return httpRequest;
}
+ public HttpResponse getHttpResponse() {
+ return httpResponse;
+ }
+
@Override
protected void populateInitialHeaders(Map<String, Object> map) {
try {
- httpBinding.populateCamelHeaders(httpRequest, map, getExchange());
+ if (httpRequest != null) {
+ httpBinding.populateCamelHeaders(httpRequest, map, getExchange());
+ } else {
+ httpBinding.populateCamelHeaders(httpResponse, map, getExchange());
+ }
} catch (Exception e) {
throw new RuntimeCamelException("Error populating initial headers", e);
}
}
@Override
public DefaultMessage newInstance() {
- return new NettyHttpMessage(httpRequest, httpBinding);
+ return new NettyHttpMessage(httpRequest, httpResponse, httpBinding);
}
}
@@ -16,26 +16,44 @@
*/
package org.apache.camel.component.netty.http.handlers;
-import org.apache.camel.component.netty.NettyProducer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.component.netty.handlers.ClientChannelHandler;
import org.apache.camel.component.netty.http.NettyHttpProducer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.handler.codec.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Netty HTTP {@link org.apache.camel.component.netty.handlers.ClientChannelHandler} that handles the response combing
+ * back from thhe HTTP server, called by this client.
+ *
+ */
public class HttpClientChannelHandler extends ClientChannelHandler {
- // use NettyHttpConsumer as logger to make it easier to read the logs as this is part of the producer
+ // use NettyHttpProducer as logger to make it easier to read the logs as this is part of the producer
private static final transient Logger LOG = LoggerFactory.getLogger(NettyHttpProducer.class);
+ private final NettyHttpProducer producer;
+ private HttpResponse response;
- public HttpClientChannelHandler(NettyProducer producer) {
+ public HttpClientChannelHandler(NettyHttpProducer producer) {
super(producer);
+ this.producer = producer;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
- super.messageReceived(ctx, messageEvent); //To change body of overridden methods use File | Settings | File Templates.
+ // store response, as this channel handler is created per pipeline
+ response = (HttpResponse) messageEvent.getMessage();
+
+ super.messageReceived(ctx, messageEvent);
}
+ @Override
+ protected Message getResponseMessage(Exchange exchange, MessageEvent messageEvent) throws Exception {
+ // use the binding
+ return producer.getEndpoint().getNettyHttpBinding().toCamelMessage(response, exchange);
+ }
}
@@ -16,7 +16,10 @@
*/
package org.apache.camel.component.netty.http;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
import org.junit.Test;
public class NettyHttpProducerSimpleTest extends BaseNettyTest {
@@ -31,6 +34,31 @@ public void testHttpSimple() throws Exception {
assertMockEndpointsSatisfied();
}
+ @Test
+ public void testHttpSimpleExchange() throws Exception {
+ getMockEndpoint("mock:input").expectedBodiesReceived("Hello World");
+
+ Exchange out = template.request("netty-http:http://localhost:{{port}}/foo", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody("Hello World");
+ }
+ });
+ assertNotNull(out);
+ assertTrue(out.hasOut());
+
+ NettyHttpMessage response = out.getOut(NettyHttpMessage.class);
+ assertNotNull(response);
+ assertEquals(200, response.getHttpResponse().getStatus().getCode());
+
+ // we can also get the response as body
+ HttpResponse body = out.getOut().getBody(HttpResponse.class);
+ assertNotNull(body);
+ assertEquals(200, body.getStatus().getCode());
+
+ assertMockEndpointsSatisfied();
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
Oops, something went wrong.

0 comments on commit b0ccd94

Please sign in to comment.