Skip to content

Latest commit

 

History

History
672 lines (537 loc) · 22.4 KB

netty-component.adoc

File metadata and controls

672 lines (537 loc) · 22.4 KB

Netty

Since Camel 2.14

Both producer and consumer are supported

The Netty component in Camel is a socket communication component, based on the Netty project version 4. Netty is a NIO client server framework that enables quick and easy development of networkServerInitializerFactory applications such as protocol servers and clients. Netty greatly simplifies and streamlines network programming such as TCP and UDP socket server.

This Camel component supports both producer and consumer endpoints.

The Netty component has several options and allows fine-grained control of a number of TCP/UDP communication parameters (buffer sizes, keepAlive, tcpNoDelay, etc.), and facilitates both In-Only and In-Out communication on a Camel route.

Maven users will need to add the following dependency to their pom.xml for this component:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-netty</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

URI format

The URI scheme for a netty component is as follows

TCP
netty:tcp://0.0.0.0:99999[?options]
UDP
netty:udp://remotehost:99999/[?options]

This component supports producer and consumer endpoints for both TCP and UDP.

Registry-based Options

Codec Handlers and SSL Keystores can be enlisted in the Registry, such as in the Spring XML file. The values that could be passed in are the following:

Name Description

passphrase

password setting to use to encrypt/decrypt payloads sent using SSH

keyStoreFormat

keystore format to be used for payload encryption. Defaults to JKS if not set

securityProvider

Security provider to be used for payload encryption. Defaults to SunX509 if not set.

keyStoreFile

deprecated: Client side certificate keystore to be used for encryption

trustStoreFile

deprecated: Server side certificate keystore to be used for encryption

keyStoreResource

Client side certificate keystore to be used for encryption. It is loaded by default from classpath, but you can prefix with "classpath:", "file:", or "http:" to load the resource from different systems.

trustStoreResource

Server side certificate keystore to be used for encryption. It is loaded by default from classpath, but you can prefix with "classpath:", "file:", or "http:" to load the resource from different systems.

sslHandler

Reference to a class that could be used to return an SSL Handler

encoder

A custom ChannelHandler class that can be used to perform special marshalling of outbound payloads. Must override io.netty.channel.ChannelInboundHandlerAdapter.

encoders

A list of encoders to be used. You can use a string that has values separated by comma, and have the values be looked up in the Registry. Remember to prefix the value with # so Camel knows it should look up.

decoder

A custom ChannelHandler class that can be used to perform special marshalling of inbound payloads. Must override io.netty.channel.ChannelOutboundHandlerAdapter.

decoders

A list of decoders to be used. You can use a string that has values separated by comma, and have the values be looked up in the Registry. Remember to prefix the value with # so Camel knows it should look up.

Note

Read below about using non-shareable encoders/decoders.

Using non-shareable encoders or decoders

If your encoders or decoders are not shareable (e.g., they don’t have the @Shareable class annotation), then your encoder/decoder must implement the org.apache.camel.component.netty.ChannelHandlerFactory interface, and return a new instance in the newChannelHandler method. This is to ensure the encoder/decoder can safely be used. If this is not the case, then the Netty component will log a WARN when an endpoint is created.

The Netty component offers a org.apache.camel.component.netty.ChannelHandlerFactories factory class, that has a number of commonly used methods.

Sending Messages to/from a Netty endpoint

Netty Producer

In Producer mode, the component provides the ability to send payloads to a socket endpoint using either TCP or UDP protocols (with optional SSL support).

The producer mode supports both one-way and request-response based operations.

Netty Consumer

In Consumer mode, the component provides the ability to:

  • listen to a specified socket using either TCP or UDP protocols (with optional SSL support),

  • receive requests on the socket using text/xml, binary and serialized object-based payloads and

  • send them along on a route as message exchanges.

The consumer mode supports both one-way and request-response based operations.

Using Multiple Codecs

In certain cases, it may be necessary to add chains of encoders and decoders to the netty pipeline. To add multiple codecs to a Camel netty endpoint, the encoders and decoders uri parameters should be used. Like the encoder and decoder parameters they are used to supply references (lists of ChannelUpstreamHandlers and ChannelDownstreamHandlers) that should be added to the pipeline.

Note that if encoders are specified, then the encoder param will be ignored, similarly for decoders and the decoder param.

Note

Read further about using non-shareable encoders/decoders.

The lists of codecs need to be added to the Camel’s registry, so they can be resolved when the endpoint is created.

ChannelHandlerFactory lengthDecoder = ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);

StringDecoder stringDecoder = new StringDecoder();
registry.bind("length-decoder", lengthDecoder);
registry.bind("string-decoder", stringDecoder);

LengthFieldPrepender lengthEncoder = new LengthFieldPrepender(4);
StringEncoder stringEncoder = new StringEncoder();
registry.bind("length-encoder", lengthEncoder);
registry.bind("string-encoder", stringEncoder);

List<ChannelHandler> decoders = new ArrayList<ChannelHandler>();
decoders.add(lengthDecoder);
decoders.add(stringDecoder);

List<ChannelHandler> encoders = new ArrayList<ChannelHandler>();
encoders.add(lengthEncoder);
encoders.add(stringEncoder);

registry.bind("encoders", encoders);
registry.bind("decoders", decoders);

Spring’s native collections support can be used to specify the codec lists in an application context

<util:list id="decoders" list-class="java.util.LinkedList">
        <bean class="org.apache.camel.component.netty.ChannelHandlerFactories" factory-method="newLengthFieldBasedFrameDecoder">
            <constructor-arg value="1048576"/>
            <constructor-arg value="0"/>
            <constructor-arg value="4"/>
            <constructor-arg value="0"/>
            <constructor-arg value="4"/>
        </bean>
        <bean class="io.netty.handler.codec.string.StringDecoder"/>
    </util:list>

    <util:list id="encoders" list-class="java.util.LinkedList">
        <bean class="io.netty.handler.codec.LengthFieldPrepender">
            <constructor-arg value="4"/>
        </bean>
        <bean class="io.netty.handler.codec.string.StringEncoder"/>
    </util:list>

    <bean id="length-encoder" class="io.netty.handler.codec.LengthFieldPrepender">
        <constructor-arg value="4"/>
    </bean>
    <bean id="string-encoder" class="io.netty.handler.codec.string.StringEncoder"/>

    <bean id="length-decoder" class="org.apache.camel.component.netty.ChannelHandlerFactories" factory-method="newLengthFieldBasedFrameDecoder">
        <constructor-arg value="1048576"/>
        <constructor-arg value="0"/>
        <constructor-arg value="4"/>
        <constructor-arg value="0"/>
        <constructor-arg value="4"/>
    </bean>
    <bean id="string-decoder" class="io.netty.handler.codec.string.StringDecoder"/>

The bean names can then be used in netty endpoint definitions either as a comma-separated list or contained in a list, e.g.:

Java
 from("direct:multiple-codec").to("netty:tcp://0.0.0.0:{{port}}?encoders=#encoders&sync=false");

 from("netty:tcp://0.0.0.0:{{port}}?decoders=#length-decoder,#string-decoder&sync=false").to("mock:multiple-codec");
XML
<camelContext id="multiple-netty-codecs-context" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:multiple-codec"/>
        <to uri="netty:tcp://0.0.0.0:5150?encoders=#encoders&amp;sync=false"/>
    </route>
    <route>
        <from uri="netty:tcp://0.0.0.0:5150?decoders=#length-decoder,#string-decoder&amp;sync=false"/>
        <to uri="mock:multiple-codec"/>
    </route>
</camelContext>

Closing Channel When Complete

When acting as a server, you sometimes want to close the channel when, for example, a client conversion is finished. You can do this by simply setting the endpoint option disconnect=true.

However, you can also instruct Camel on a per-message basis as follows. To instruct Camel to close the channel, you should add a header with the key CamelNettyCloseChannelWhenComplete set to a boolean true value. For instance, the example below will close the channel after it has written the bye message back to the client:

from("netty:tcp://0.0.0.0:8080").process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        String body = exchange.getIn().getBody(String.class);
        exchange.getOut().setBody("Bye " + body);
        // some condition that determines if we should close
        if (close) {
            exchange.getOut().setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true);
        }
    }
});

Adding custom channel pipeline factories to gain complete control over a created pipeline

Custom pipeline

Custom channel pipelines provide complete control to the user over the handler/interceptor chain by inserting custom handler(s), encoder(s) & decoder(s) without having to specify them in the Netty Endpoint URL in a straightforward way.

To add a custom pipeline, a custom channel pipeline factory must be created and registered with the context via the context registry (or the camel-spring ApplicationContextRegistry, etc).

A custom pipeline factory must be constructed as follows

  • A Producer-linked channel pipeline factory must extend the abstract class ClientInitializerFactory.

  • A Consumer-linked channel pipeline factory must extend the abstract class ServerInitializerFactory.

  • The classes should override the initChannel() method to insert custom handler(s), encoder(s) and decoder(s). Not overriding the initChannel() method creates a pipeline with no handlers, encoders or decoders wired to the pipeline.

The example below shows how ServerInitializerFactory factory may be created

Using custom pipeline factory

public class SampleServerInitializerFactory extends ServerInitializerFactory {
    private int maxLineSize = 1024;

    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline channelPipeline = ch.pipeline();

        channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
        channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
        // here we add the default Camel ServerChannelHandler for the consumer, to allow Camel to route the message, etc.
        channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
    }
}

The custom channel pipeline factory can then be added to the registry and instantiated/utilized on a Camel route in the following way

Registry registry = camelContext.getRegistry();
ServerInitializerFactory factory = new TestServerInitializerFactory();
registry.bind("spf", factory);
context.addRoutes(new RouteBuilder() {
  public void configure() {
      String netty_ssl_endpoint =
         "netty:tcp://0.0.0.0:5150?serverInitializerFactory=#spf"
      String return_string =
         "When You Go Home, Tell Them Of Us And Say,"
         + "For Your Tomorrow, We Gave Our Today.";

      from(netty_ssl_endpoint)
       .process(new Processor() {
          public void process(Exchange exchange) throws Exception {
            exchange.getOut().setBody(return_string);
          }
       }
  }
});

Reusing Netty boss and worker thread pools

Netty has two kinds of thread pools: boss and worker. By default, each Netty consumer and producer has their private thread pools. If you want to reuse these thread pools among multiple consumers or producers, then the thread pools must be created and enlisted in the Registry.

For example, using Spring XML we can create a shared worker thread pool using the NettyWorkerPoolBuilder with two worker threads as shown below:

<!-- use the worker pool builder to help create the shared thread pool -->
<bean id="poolBuilder" class="org.apache.camel.component.netty.NettyWorkerPoolBuilder">
  <property name="workerCount" value="2"/>
</bean>

<!-- the shared worker thread pool -->
<bean id="sharedPool" class="org.jboss.netty.channel.socket.nio.WorkerPool"
      factory-bean="poolBuilder" factory-method="build" destroy-method="shutdown">
</bean>
Tip

For boss thread pool there is a org.apache.camel.component.netty.NettyServerBossPoolBuilder builder for Netty consumers, and a org.apache.camel.component.netty.NettyClientBossPoolBuilder for the Netty producers.

Then in the Camel routes we can refer to this worker pools by configuring the workerPool option in the URI as shown below:

<route>
  <from uri="netty:tcp://0.0.0.0:5021?textline=true&amp;sync=true&amp;workerPool=#sharedPool&amp;usingExecutorService=false"/>
  <to uri="log:result"/>
  ...
</route>

And if we have another route, we can refer to the shared worker pool:

<route>
  <from uri="netty:tcp://0.0.0.0:5022?textline=true&amp;sync=true&amp;workerPool=#sharedPool&amp;usingExecutorService=false"/>
  <to uri="log:result"/>
  ...
</route>

And so forth.

Multiplexing concurrent messages over a single connection with request/reply

When using Netty for request/reply messaging via the netty producer, then by default, each message is sent via a non-shared connection (pooled). This ensures that replies are automatic being able to map to the correct request thread for further routing in Camel. In other words, correlation between request/reply messages happens out-of-the-box because the replies come back on the same connection that was used for sending the request; and this connection is not shared with others. When the response comes back, the connection is returned to the connection pool, where it can be reused by others.

However, if you want to multiplex concurrent request/responses on a single shared connection, then you need to turn off the connection pooling by setting producerPoolEnabled=false. Now this means there is a potential issue with interleaved responses if replies come back out-of-order. Therefore, you need to have a correlation id in both the request and reply messages, so you can properly correlate the replies to the Camel callback that is responsible for continue processing the message in Camel. To do this, you need to implement NettyCamelStateCorrelationManager as correlation manager and configure it via the correlationManager=#myManager option.

Note

We recommend extending the TimeoutCorrelationManagerSupport when you build custom correlation managers. This provides support for timeout and other complexities you otherwise would need to implement as well.

You can find an example with the Apache Camel source code in the examples directory under the camel-example-netty-custom-correlation directory.

Native transport

To enable native transport, you need to add additional dependency for epoll or kqueue depending on your OS and CPU arch. To make it easier add the following extension to your build section of pom.xml:

<extensions>
    <extension>
        <groupId>kr.motd.maven</groupId>
        <artifactId>os-maven-plugin</artifactId>
    </extension>
</extensions>

So then you need to add the following dependency:

Linux/Unix
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-transport-native-epoll</artifactId>
    <classifier>${os.detected.classifier}</classifier>
</dependency>
MacOS/BSD
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-transport-native-kqueue</artifactId>
    <classifier>${os.detected.classifier}</classifier>
</dependency>

Examples

A UDP Netty endpoint using Request-Reply and serialized object payload

Note that Object serialization is not allowed by default, and so a decoder must be configured.

@BindToRegistry("decoder")
public ChannelHandler getDecoder() throws Exception {
    return new DefaultChannelHandlerFactory() {
        @Override
        public ChannelHandler newChannelHandler() {
            return new DatagramPacketObjectDecoder(ClassResolvers.weakCachingResolver(null));
        }
    };
}

RouteBuilder builder = new RouteBuilder() {
  public void configure() {
    from("netty:udp://0.0.0.0:5155?sync=true&decoders=#decoder")
      .process(new Processor() {
         public void process(Exchange exchange) throws Exception {
           Poetry poetry = (Poetry) exchange.getIn().getBody();
           // Process poetry in some way
           exchange.getOut().setBody("Message received);
         }
       }
    }
};

A TCP-based Netty consumer endpoint using One-way communication

RouteBuilder builder = new RouteBuilder() {
  public void configure() {
       from("netty:tcp://0.0.0.0:5150")
           .to("mock:result");
  }
};

An SSL/TCP-based Netty consumer endpoint using Request-Reply communication

Using the JSSE Configuration Utility

The Netty component supports SSL/TLS configuration through the Camel JSSE Configuration Utility. This utility greatly decreases the amount of component-specific code you need to write and is configurable at the endpoint and component levels. The following examples demonstrate how to use the utility with the Netty component.

Programmatic configuration of the component

KeyStoreParameters ksp = new KeyStoreParameters();
ksp.setResource("/users/home/server/keystore.jks");
ksp.setPassword("keystorePassword");

KeyManagersParameters kmp = new KeyManagersParameters();
kmp.setKeyStore(ksp);
kmp.setKeyPassword("keyPassword");

SSLContextParameters scp = new SSLContextParameters();
scp.setKeyManagers(kmp);

NettyComponent nettyComponent = getContext().getComponent("netty", NettyComponent.class);
nettyComponent.getConfiguration().setSslContextParameters(scp);

Spring DSL based configuration of endpoint

...
  <camel:sslContextParameters
      id="sslContextParameters">
    <camel:keyManagers
        keyPassword="keyPassword">
      <camel:keyStore
          resource="/users/home/server/keystore.jks"
          password="keystorePassword"/>
    </camel:keyManagers>
  </camel:sslContextParameters>...
...
  <to uri="netty:tcp://0.0.0.0:5150?sync=true&ssl=true&sslContextParameters=#sslContextParameters"/>
...

Using Basic SSL/TLS configuration on the Jetty Component

Registry registry = context.getRegistry();
registry.bind("password", "changeit");
registry.bind("ksf", new File("src/test/resources/keystore.jks"));
registry.bind("tsf", new File("src/test/resources/keystore.jks"));

context.addRoutes(new RouteBuilder() {
  public void configure() {
      String netty_ssl_endpoint =
         "netty:tcp://0.0.0.0:5150?sync=true&ssl=true&passphrase=#password"
         + "&keyStoreFile=#ksf&trustStoreFile=#tsf";
      String return_string =
         "When You Go Home, Tell Them Of Us And Say,"
         + "For Your Tomorrow, We Gave Our Today.";

      from(netty_ssl_endpoint)
       .process(new Processor() {
          public void process(Exchange exchange) throws Exception {
            exchange.getOut().setBody(return_string);
          }
       }
  }
});

Getting access to SSLSession and the client certificate

You can get access to the javax.net.ssl.SSLSession if you e.g., need to get details about the client certificate. When ssl=true then the Netty component will store the SSLSession as a header on the Camel Message as shown below:

SSLSession session = exchange.getIn().getHeader(NettyConstants.NETTY_SSL_SESSION, SSLSession.class);
// get the first certificate which is client certificate
javax.security.cert.X509Certificate cert = session.getPeerCertificateChain()[0];
Principal principal = cert.getSubjectDN();

Remember to set needClientAuth=true to authenticate the client, otherwise SSLSession cannot access information about the client certificate, and you may get an exception javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated. You may also get this exception if the client certificate is expired or not valid, etc.

Tip

The option sslClientCertHeaders can be set to true which then enriches the Camel Message with headers having details about the client certificate. For example, the subject name is readily available in the header CamelNettySSLClientCertSubjectName.

spring-boot:partial$starter.adoc