Skip to content

Commit

Permalink
CAMEL-6327: More work on new camel-netty-http component.
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/camel/trunk@1478078 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
davsclaus committed May 1, 2013
1 parent ac6a8c1 commit b0ccd94
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public DefaultNettyHttpBinding(HeaderFilterStrategy headerFilterStrategy) {
public Message toCamelMessage(HttpRequest request, Exchange exchange) throws Exception {
LOG.trace("toCamelMessage: {}", request);

NettyHttpMessage answer = new NettyHttpMessage(request, this);
NettyHttpMessage answer = new NettyHttpMessage(request, null, this);
// force getting headers which will populate them
answer.getHeaders();

Expand Down Expand Up @@ -98,6 +98,46 @@ public void populateCamelHeaders(HttpRequest request, Map<String, Object> header
}
}

@Override
public Message toCamelMessage(HttpResponse response, Exchange exchange) throws Exception {
LOG.trace("toCamelMessage: {}", response);

NettyHttpMessage answer = new NettyHttpMessage(null, response, this);
// force getting headers which will populate them
answer.getHeaders();

// keep the body as is, and use type converters
answer.setBody(response.getContent());
return answer;
}

@Override
public void populateCamelHeaders(HttpResponse response, Map<String, Object> headers, Exchange exchange) throws Exception {
LOG.trace("populateCamelHeaders: {}", response);

headers.put(Exchange.HTTP_RESPONSE_CODE, response.getStatus().getCode());
// TODO: use another status header
headers.put("CamelHttpResponseText", response.getStatus().getReasonPhrase());

for (String name : response.getHeaderNames()) {
// mapping the content-type
if (name.toLowerCase().equals("content-type")) {
name = Exchange.CONTENT_TYPE;
}
// add the headers one by one, and use the header filter strategy
List<String> values = response.getHeaders(name);
Iterator<?> it = ObjectHelper.createIterator(values);
while (it.hasNext()) {
Object extracted = it.next();
LOG.trace("HTTP-header: {}", extracted);
if (headerFilterStrategy != null
&& !headerFilterStrategy.applyFilterToExternalHeaders(name, extracted, exchange)) {
NettyHttpHelper.appendHeader(headers, name, extracted);
}
}
}
}

@Override
public HttpResponse toNettyResponse(Message message) throws Exception {
LOG.trace("toNettyResponse: {}", message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,29 @@
public class HttpClientPipelineFactory extends ClientPipelineFactory {

private static final Logger LOG = LoggerFactory.getLogger(HttpClientPipelineFactory.class);
private NettyProducer producer;
private NettyHttpProducer producer;
private SSLContext sslContext;

public HttpClientPipelineFactory() {
// default constructor needed
}

public HttpClientPipelineFactory(NettyProducer nettyProducer) {
public HttpClientPipelineFactory(NettyHttpProducer nettyProducer) {
this.producer = nettyProducer;
try {
this.sslContext = createSSLContext(producer);
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}

LOG.info("Created SslContext {}", sslContext);
if (sslContext != null) {
LOG.info("Created SslContext {}", sslContext);
}
}

@Override
public ClientPipelineFactory createPipelineFactory(NettyProducer nettyProducer) {
return new HttpClientPipelineFactory(nettyProducer);
return new HttpClientPipelineFactory((NettyHttpProducer) nettyProducer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ public HttpServerPipelineFactory(NettyHttpConsumer nettyConsumer) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}

LOG.info("Created SslContext {}", sslContext);
if (sslContext != null) {
LOG.info("Created SslContext {}", sslContext);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,27 @@ public interface NettyHttpBinding {
void populateCamelHeaders(HttpRequest request, Map<String, Object> headers, Exchange exchange) throws Exception;

/**
* Binds from Camel {@link Message} to Netty {@link org.jboss.netty.handler.codec.http.HttpResponse}.
* Binds from Netty {@link HttpResponse} to Camel {@Message}.
*
* @param response the netty http response
* @param exchange the exchange that should contain the returned message.
* @return the message to store on the given exchange
* @throws Exception is thrown if error during binding
*/
Message toCamelMessage(HttpResponse response, Exchange exchange) throws Exception;

/**
* Binds from Netty {@link HttpResponse} to Camel headers as a {@link Map}.
*
* @param response the netty http response
* @param headers the Camel headers that should be populated
* @param exchange the exchange that should contain the returned message.
* @throws Exception is thrown if error during binding
*/
void populateCamelHeaders(HttpResponse response, Map<String, Object> headers, Exchange exchange) throws Exception;

/**
* Binds from Camel {@link Message} to Netty {@link HttpResponse}.
*
* @param message the Camel message
* @return the http response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ public static Object convertToHttpRequest(Class<?> type, Exchange exchange, Obje
// okay we may need to cheat a bit when we want to grab the HttpRequest as its stored on the NettyHttpMessage
// so if the message instance is a NettyHttpMessage and its body is the value, then we can grab the
// HttpRequest from the NettyHttpMessage
NettyHttpMessage msg = exchange.getIn(NettyHttpMessage.class);
NettyHttpMessage msg;
if (exchange.hasOut()) {
msg = exchange.getOut(NettyHttpMessage.class);
} else {
msg = exchange.getIn(NettyHttpMessage.class);
}
if (msg != null && msg.getBody() == value) {
return msg.getHttpRequest();
}
Expand All @@ -51,6 +56,31 @@ public static Object convertToHttpRequest(Class<?> type, Exchange exchange, Obje
return null;
}

/**
* A fallback converter that allows us to easily call Java beans and use the raw Netty {@link HttpRequest} as parameter types.
*/
@FallbackConverter
public static Object convertToHttpResponse(Class<?> type, Exchange exchange, Object value, TypeConverterRegistry registry) {
// if we want to covert to convertToHttpResponse
if (value != null && HttpResponse.class.isAssignableFrom(type)) {

// okay we may need to cheat a bit when we want to grab the HttpRequest as its stored on the NettyHttpMessage
// so if the message instance is a NettyHttpMessage and its body is the value, then we can grab the
// HttpRequest from the NettyHttpMessage
NettyHttpMessage msg;
if (exchange.hasOut()) {
msg = exchange.getOut(NettyHttpMessage.class);
} else {
msg = exchange.getIn(NettyHttpMessage.class);
}
if (msg != null && msg.getBody() == value) {
return msg.getHttpResponse();
}
}

return null;
}

@Converter
public static String toString(HttpResponse response, Exchange exchange) {
String contentType = response.getHeader(Exchange.CONTENT_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultMessage;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;

/**
* Netty HTTP based {@link org.apache.camel.Message}.
Expand All @@ -31,28 +32,38 @@
public class NettyHttpMessage extends DefaultMessage {

private final transient HttpRequest httpRequest;
private final transient HttpResponse httpResponse;
private final transient NettyHttpBinding httpBinding;

public NettyHttpMessage(HttpRequest httpRequest, NettyHttpBinding httpBinding) {
public NettyHttpMessage(HttpRequest httpRequest, HttpResponse httpResponse, NettyHttpBinding httpBinding) {
this.httpRequest = httpRequest;
this.httpResponse = httpResponse;
this.httpBinding = httpBinding;
}

public HttpRequest getHttpRequest() {
return httpRequest;
}

public HttpResponse getHttpResponse() {
return httpResponse;
}

@Override
protected void populateInitialHeaders(Map<String, Object> map) {
try {
httpBinding.populateCamelHeaders(httpRequest, map, getExchange());
if (httpRequest != null) {
httpBinding.populateCamelHeaders(httpRequest, map, getExchange());
} else {
httpBinding.populateCamelHeaders(httpResponse, map, getExchange());
}
} catch (Exception e) {
throw new RuntimeCamelException("Error populating initial headers", e);
}
}

@Override
public DefaultMessage newInstance() {
return new NettyHttpMessage(httpRequest, httpBinding);
return new NettyHttpMessage(httpRequest, httpResponse, httpBinding);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,44 @@
*/
package org.apache.camel.component.netty.http.handlers;

import org.apache.camel.component.netty.NettyProducer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.netty.handlers.ClientChannelHandler;
import org.apache.camel.component.netty.http.NettyHttpProducer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Netty HTTP {@link org.apache.camel.component.netty.handlers.ClientChannelHandler} that handles the response combing
* back from thhe HTTP server, called by this client.
*
*/
public class HttpClientChannelHandler extends ClientChannelHandler {

// use NettyHttpConsumer as logger to make it easier to read the logs as this is part of the producer
// use NettyHttpProducer as logger to make it easier to read the logs as this is part of the producer
private static final transient Logger LOG = LoggerFactory.getLogger(NettyHttpProducer.class);
private final NettyHttpProducer producer;
private HttpResponse response;

public HttpClientChannelHandler(NettyProducer producer) {
public HttpClientChannelHandler(NettyHttpProducer producer) {
super(producer);
this.producer = producer;
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
super.messageReceived(ctx, messageEvent); //To change body of overridden methods use File | Settings | File Templates.
// store response, as this channel handler is created per pipeline
response = (HttpResponse) messageEvent.getMessage();

super.messageReceived(ctx, messageEvent);
}

@Override
protected Message getResponseMessage(Exchange exchange, MessageEvent messageEvent) throws Exception {
// use the binding
return producer.getEndpoint().getNettyHttpBinding().toCamelMessage(response, exchange);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*/
package org.apache.camel.component.netty.http;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.junit.Test;

public class NettyHttpProducerSimpleTest extends BaseNettyTest {
Expand All @@ -31,6 +34,31 @@ public void testHttpSimple() throws Exception {
assertMockEndpointsSatisfied();
}

@Test
public void testHttpSimpleExchange() throws Exception {
getMockEndpoint("mock:input").expectedBodiesReceived("Hello World");

Exchange out = template.request("netty-http:http://localhost:{{port}}/foo", new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody("Hello World");
}
});
assertNotNull(out);
assertTrue(out.hasOut());

NettyHttpMessage response = out.getOut(NettyHttpMessage.class);
assertNotNull(response);
assertEquals(200, response.getHttpResponse().getStatus().getCode());

// we can also get the response as body
HttpResponse body = out.getOut().getBody(HttpResponse.class);
assertNotNull(body);
assertEquals(200, body.getStatus().getCode());

assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
Expand Down
Loading

0 comments on commit b0ccd94

Please sign in to comment.