Skip to content
This repository has been archived by the owner on Jul 9, 2022. It is now read-only.

DSL User's Guide

dturanski edited this page Oct 27, 2012 · 2 revisions

Introduction

The Groovy DSL for Spring Integration is essentially a facade for Spring Integration. This guide assumes you are familiar with Spring Integration. If this is not the case, the SI Reference is a good place to begin.

The DSL provides a simple way to embed Spring Integration into your application using the Groovy Builder pattern in lieu of Spring XML. This should readily appeal to Groovy developers who have a need for Spring Integration, for example, in a Grails application. In addition, a DSL scripts may be loaded as an external Groovy script, allowing it to be executed from a Java class. The cafe example included in the spring-integration-groovy-dsl-samples module is a good example of using the DSL in a Java application.

How it Works

DSL scripts are parsed by the IntegrationBuilder, the core component which extends FactoryBuilderSupport. The builder pattern is used to express arbitrarily complex structures as a hierarchy of methods that may except closures as arguments. A closure may invoke methods which also except a closure argument, and so on. This syntactically valid Groovy. The methods are not implemented. The builder traverses the structure and delegates to factory components, each registered to handle one or more methods. The builder passes the method name and arguments, along with additional context, so that the factory may perform whatever tasks are needed. Since the structure is valid Groovy code, it is very easy to inject your code within these closures. If so, it will be executed in line. Only undefined methods will be parsed and processed by the builder.

The DSL for Spring Integration uses the builder to translate a Groovy script to the equivalent Spring Integration XML. Please see DSL to Spring XML Mapping for details on this topic.

However, the DSL offers more than syntactic sugar on top of XML. One of its most compelling features is the ability to define inline Groovy closures to implement endpoint logic, eliminating the need for external classes to implement custom logic. In some sense, Spring Integration's support for the Spring Expression Language (SpEL) and inline scripting address this, but Groovy closures are easier and much more powerful.

Executing Groovy closures within Spring Integration requires integration with internal components as well as some type handling logic to permit closures to access the message payload, headers, or the entire message. Please see Working with Closures for details on this topic.

DSL Basics

The core DSL module, spring-integration-dsl-groovy-core, includes the IntegrationBuilder. This is the central component for processing the DSL.

The core module also provides common EIP components for message based applications, such as channels, endpoints, and channel interceptors.

Endpoints are expressed as verbs in the DSL to improve readability. The following list includes the common DSL method names and the associated EIP endpoint:

  • transform -> Transformer
  • filter -> Filter
  • handle -> ServiceActivator
  • split -> Splitter
  • aggregate -> Aggregator
  • poll -> Poller
  • bridge -> Bridge

Conceptually, integration processes are constructed by composing these endpoints into one or more message flows. Note that EIP does not formally define the term 'message flow', but it is useful to think of it as a unit of work that uses well known messaging patterns. The DSL provides a MessageFlow type to define a simple composition with one input channel and zero or more output channels. Let's start with a simple example:

def builder = new IntegrationBuilder()
def flow = builder.messageFlow {
     transform {"$hello, $it!"}
}

assert (flow.sendAndReceive("world") == "hello, world!")

Here we used the IntegrationBuilder to create a MessageFlow.

The IntegrationBuilder accepts closures as endpoint arguments to operate on the message. For example, the transform method takes a closure argument which represents the endpoint action (Note: The closure is not technically required since the normal Spring Integration 'ref' and 'method' pattern is a valid alternative).

Under the covers, IntegrationBuilder recognizes the endpoint action as something it should not attempt to parse or execute immediately. Instead the builder creates reference to the closure to be invoked by the endpoint to process a message.

Notice that we have not defined any channels. The message flow takes care of this. It automatically creates an input channel and assigns it to the first endpoint by default. If the message flow contains multiple endpoints, they will automatically be wired together using direct channels or queue channels as appropriate to operate in sequence. The message flow's sendAndReceive method sends a message to the flow's input channel and receives the reply. Let's look at another example:

def flow = builder.messageFlow(inputChannel:'inputChannel') {
     filter {it == 'World'}
     transform {'Hello ' + it}
     handle {println "****************** $it ***************" }
}

The above example composes a sequence of Filter->Transformer->Service Activator. The flow is 'one way', that is it does not return a value but simply prints the payload to STDOUT. The endpoints are automatically wired together using direct channels.

Channel Naming Conventions

All endpoints may explicitly define input and output channels using the inputChannel and outputChannel named parameters as illustrated in the messageFlow above. Another convention the DSL uses is if a name is provided for the message flow (either as an unnamed parameter or named name), it's input channel will be named .inputChannel. The following example uses the doWithSpringIntegration closure to illustrate this point. This returns an IntegrationContext which represents a top level enclosure for one or more message flows, channel definitions, spring bean configuration, and so on. The input channel for the message flow named 'flow' is automatically generated as 'flow.inputChannel'. The transformer explicitly defines an input channel named 'transformerChannel'. The IntegrationContext provides methods to send a message to any named channel. Thus, the second call to send() sends a message directly to the transformer, bypassing the filter.

def builder = new IntegrationBuilder()
def ic = builder.doWithSpringIntegration {
    messageFlow('flow') {
      filter {it == 'World'}
      transform(inputChannel:'transformerChannel') {'Hello ' + it}
      handle {println "****************** $it ***************" }
    }
}
	
ic.send('flow.inputChannel','World')
ic.send('transformerChannel','Earth')

Breaking The Chain

Simple endpoints may be configured to disable chaining when contained within a message flow. Consider the following:

def flow = builder.messageFlow {
    route('myRouter', { Map headers -> headers.foo }) {
        map(bar:'barChannel',baz:'bazChannel')
    }
     
    transform(inputChannel:'barChannel',{it[0..1]},linkToNext:false)
    transform(inputChannel:'bazChannel',{it*2})
}

In the example above, the message flow contains a router and two transformers. By default, a message routed to the first transformer would then be passed to the second transformer. Rather, the intention is for the router to act as a switch to one of the transformers based on the value of the 'foo' message header. Setting linkToNext to false essentially terminates the flow. Thus one message flow can have multiple distinct paths.

Simple endpoints such as Transformer, Filter, ServiceActivator, and Bridge are straightforward to use so will not be discussed further. The main points to keep in mind:

  • A transform() closure must return a result (Note that Spring Integration does not dispatch messages with null payloads)
  • A handle() closure at the end of a message flow need not return a result
  • A filter() should return a boolean result (or a Groovy truth value)

The next sections discuss selected endpoints which require further explanation.

Message Routers

Spring Integration natively provides specialized router types including:

  • HeaderValueRouter
  • PayloadTypeRouter
  • ExceptionTypeRouter
  • RecipientListRouter

The DSL can utilize a more generic approach to easily accomplish these things. For example, a HeaderValueRouter is implemented in the previous example :

route('myRouter', { Map headers -> headers.foo }) {
    map(bar:'barChannel',baz:'bazChannel')
 }

The above is an instance of a more general routing pattern: Evaluate an expression and provide a channel map associating each possible result with a corresponding channel. With dynamic typing, the same basic construct may be used to evaluate a message header, the payload, or any general expression available to the closure. In this case, simply declaring the closure argument type to be a Map causes the DSL to assign the message headers to the closure argument (unless the payload is also a Map - then it will be used instead). Note that route accepts a second closure to define routing logic. PayloadType and ExceptionType routing are easily implemented using instanceof in the evaluation closure. A Recipient List router is implemented simply by returning a list containing channel names:

def count = 0 
def flow = builder.messageFlow {
    route('myRouter', { ['upper.inputChannel' ,'lower.inputChannel'] } )
    handle('upper', {count ++; null})
    handle('lower', {count ++; null})
}

flow.send('Hello')
assert count == 2

An alternative to implementing routing logic with a map or recipient list is the when ... otherwise construct. A router may contain one or more when closures followed by an optional otherwise closure. Each closure contains an inline message flow which in turn can contain routers, etc. Here are some examples:

route('myRouter', { it == 'Hello' ? 'foo' : 'bar' } )
{
    when('foo') {
         handle {payload -> payload.toUpperCase()}
        // you can add more endpoints here as in a message flow
    }

    when('bar') {
         handle {payload -> payload.toLowerCase()}
    }
}

route('myRouter', { if (it == 'Hello' ) 'foo' } )
{
    when('foo') {
        handle {payload -> payload.toUpperCase()}
    }

    otherwise {
         handle {payload -> payload.toLowerCase()}
}
}

Splitters

A splitter is created using the split() method. By default, if the payload is a Collection or Array, this will output each item as an individual message. This takes a closure if you require custom logic to convert the input to a Collection or Array

Note that IntegrationBuilder overrides Groovy's default split() method which works on Strings, Collections, etc. This is subtly different in that the out of the box, Groovy will try to execute split() immediately which is not what we want. Instead we want this to build a Splitter which will invoke the given closure whenever a message is dispatched to its input channel. For example:

split {it.split(',')}  

This creates a splitter that splits a message containing a comma delimited String.

Aggregators

An Aggregator is conceptually the converse of a splitter. It aggregates a sequence of individual messages into a single message and is necessarily more complex. By default, an aggregator will return a message containing a collection of payloads from incoming messages:

def flow = builder.messageFlow {
    split()
    aggregate()
}

def result = flow.sendAndReceive([1, 2])
assert result == [1, 2] 

The above is essentially an identity transformation for a list of integers. The original payload contains the list [1, 2]. The splitter splits the list into two individual messages (m1.payload == 1 and m2.payload == 2). The aggregator then collects them into a list again.

However, you may change the default behavior by specifying a release strategy and correlation strategy, among other things. Consider the following:

def flow = builder.messageFlow {
   split()
   aggregate(releaseStrategy:{list-> (list.sum() >= 6) })
}

def result = flow.sendAndReceive([1, 2, 3, 4])
assert result == [1, 2, 3]

The release strategy closure will cause the aggregator will collect integers and release the collection whenever the sum is 6 or more. Below is another example. We want to separate a list of consecutive integers into odds and evens:

def list = (1..8)
def flow = builder.messageFlow(outputChannel:'queueChannel') {
     queueChannel('queueChannel')
     split()
     aggregate(
         releaseStrategy:{ agg -> agg.size() == list.size()/2 },
         correlationStrategy:{it % 2 ? 'odd' : 'even' })
 }

 flow.send(list)
 
 def result = flow.receive()
 assert result.payload == [1, 3, 5, 7]
 result = flow.receive()
 assert result.payload == [2, 4, 6, 8]

In the above example, the message flow's output channel is configured as a queue channel to allow multiple messages to be received for a single input message. Note that receive() will block until a message arrives. You can also specify a timeout value in ms to guarantee the call will return if no message is present.

Message Channels

The DSL currently supports direct channels, pubSubChannels, and queueChannels. These may be declared within a messageFlow or doWithSpringIntegration closure. A channel name is required. Additional attributes supported by Spring Integration are passed as named parameters, using equivalent camel case names or the original hyphenated names enclosed in single quotes. For example:

queueChannel('queue2',capacity:10)  

Here capacity is simply passed to the equivalent queue-channel XML element as an attribute

Channel Interceptors

Channel interceptors are defined by the interceptor method. If external to a channel, e.g., a child of messageFlow or doWithSpringIntegration, it is a global interceptor:

def flow
    builder.doWithSpringIntegration {
        interceptor(pattern:'*',preSend:{payload -> println "payload:$payload"; payload*2})
        flow = messageFlow {
              transform {it.toUpperCase()}
              transform {it.toLowerCase()}
              transform {it.capitalize()}
        }
  }

When nested within a channel, the pattern parameter is not valid. Note that the channel interceptor supports named closures, preSend, postSend, preReceive, and postReceive. Alternately, you may provide a ref to a ChannelInterceptor bean.

The DSL also provides a wiretap which accepts a channel:

channel('inputChannel'){ wiretap(channel:'logger') } 

Working With Message Flows

As we have seen IntegrationBuilder provides a top level messageFlow closure which returns a MessageFlow object. This is convenient if your integration may be accomplished with a single flow (which is often the case). Alternately one or more messageFlows may be nested in doWithSpringIntegration.

By default, the MessageFlow behaves as a Chain in Spring Integration parlance. That is, the endpoints are automatically wired implicitly via direct (in memory) channels. The message flow is not actually constructed as a chain, affording much more flexibility. For example, you may send a message to any component within the flow, if you know its input channel name, i.e., explicitly define it. You may also reference externally defined channels within a flow to allow the use of channel adapters to enable remote transport protocols, file I/O, and the like, instead of direct channels. Also we have seen there are ways to provide multiple termination points within a flow using linkToNext:false. As such, the DSL does not support the Spring Integration chain element since it doesn't add much value.

In addition to allowing you to define separate independent flows, the DSL supports the notion of nested flows, and invocation a sub-flows, allowing encapsulation and reuse. When working with multiple flows, you can use the top level doWithSpringIntegration closure, containing one or more message flows, or define multiple top level messageFlow elements. The following are basically equivalent.

//Example 1
builder.doWithSpringIntegration {
    messageFlow('flow1',outputChannel:'outputChannel1') {
       transform {it.toUpperCase()}
     }

     messageFlow('flow2',inputChannel:'outputChannel1') {
       filter {it.class == String}
       transform {it.toLowerCase()}
    }
}

//Example 2
builder.setAutoCreateApplicationContext(false)

def flow1 = builder.messageFlow('flow1',outputChannel:'outputChannel1') {
    transform {it.toUpperCase()}
}

def flow2 = builder.messageFlow('flow2',inputChannel:'outputChannel1') {
     filter {it.class == String}
     transform {it.toLowerCase()}
}

By default, a top level messageFlow closure will create a Spring application context. setAutoCreateApplicationContext(false) overrides this behavior. In this case the application context will be started the first time you invoke one of the send/receive methods or create it explicitly:

builder.getIntegrationContext().createApplicationContext()

Message Flows may be nested:

def flow = builder.messageFlow {
     handle {payload -> payload.toUpperCase()}
     messageFlow {
         transform {it*2}
         messageFlow {
             transform {payload->payload.toLowerCase()}
         }
     }
 }

Or may be executed as subflows:

def mainflow

def ic = builder.doWithSpringIntegration {
     def subflow1 = messageFlow {
           filter {it.class == String}
           transform {it.toLowerCase()+'one'}
     }

     def subflow2 = messageFlow {
         transform {it.toUpperCase()+'two'}
     }

     mainflow = messageFlow ('main'){
        exec(subflow1)
        exec(subflow2)
     }
 }

 assert mainflow.sendAndReceive('Hello') == 'HELLOONEtwo'

In the example above, we define two subflows and invoke them from mainflow using the exec() method. While not very useful in itself, we can see how we could easily define subflows that may be reused in many message flows.

Executing the DSL

As we have seen from the above examples, The DSL may be invoked directly from a Groovy class by constructing a top level doWithSpringIntegration or messageFlow closure. Alternately, you can use one of the IntegrationBuilder.build() methods to load an external script.

The doWithSpringIntegration closure returns an IntegrationContext instance which provides access to the Spring Application Context, and any MessageFlows defined. IntegrationContext also provides some convenience methods to send and receive messages using any defined channel. For example:

 def integrationContext = new IntegrationBuilder().doWithSpringIntegration {
       messageFlow('flow') {
           ...
       }
 }

integrationContext.sendAndReceive('flow.inputChannel','someData')

def messageFlow = integrationContext.getMessageFlowByName('flow')

def allFlows = integrationContext.getMessageFlows()

def applicationContext = integrationContext.getApplicationContext()

Using Protocol Adapters

All of the examples so far illustrate how the DSL supports a messaging architecture using the Spring Integration programming model, but we haven't done any real integration yet. This requires access to remote resources via http, jms, amqp, tcp, jdbc, ftp, smtp, and the like, or access to the local file system. Spring Integration supports all of these and more. Ideally, the DSL should offer first class support for all of these but it is a daunting task to implement all of these and keep up as new adapters are added to Spring Integration. So the expectation is that the DSL will continually be catching up with Spring Integration. Initially only a few adapters have been implemented in the DSL. Like Spring Integration, these are maintained in separate sub projects. Each adapter adds some new DSL language features and is essentially a pluggable module to the core. Thus in order to use a DSL module, you must register it with the IntegrationBuilder. Fortunately, this turns out to be pretty easy. You simply supply a comma delimited list of module names to the IntegrationBuilder constructor. For example, if your application requires http and jms, you need to add the corresponding modules to your class path and register them with the builder:

   IntegrationBuilder integrationBuilder = new IntegrationBuilder("http","jms");

This adds the module specific methods to the builder and ensures the XML DOM builder declares the required Spring Integration XML namespaces. Each module also provides a DSL Descriptor to extend the DSL tooling support. Here's an example:

builder = new IntegrationBuilder('http')

def flow = builder.messageFlow {
    httpGet(url:{"http://www.google.com/finance/info?q=$it"},responseType:String)
}		

def result = flow.sendAndReceive('VMW')
println result	 

The 'http' identifies the spring-integration-dsl-groovy-http module which defines additional DSL methods such as httpGet() along with builders needed to construct an Http outbound gateway, etc. Notice that the DSL overloads the url parameter, which normally accepts a String, to accept a closure. This allows the url to be generated dynamically from the message contents, in this case the stock ticker symbol.

Fear not if your integration requires a Spring Integration adapter that is not yet supported by the DSL. You can easily declare whatever you need using Groovy markup for Spring XML:

 def builder = new IntegrationBuilder()
 builder.doWithSpringIntegration {
     namespaces('int-ftp')
     springXml {
        'int-ftp:outbound-gateway'(id:"gateway1",
	sessionFactory:"ftpSessionFactory",
	requestChannel:"inbound1",
	command:"ls",
	commandOptions:"-1",
	expression:"payload",
	replyChannel:"toSplitter")
         
          bean(id:"ftpSessionFactory",        
               class:"org.springframework.integration.ftp.session.DefaultFtpSessionFactory"){
	   property(name:"host", value:"localhost")
          }
     }

     messageFlow(inputChannel:"toSplitter") {
          split()
          ....
     }

 } 

See Spring XML Mapping for more details on working with native Spring XML.

See the README for the associated module to learn more about its DSL features. For technical details regarding adapter modules, please refer to the DSL Developer's Guide

Executing the DSL from Java

There are several ways to load and run a DSL script from Java. The cafe demo included in the spring-integration-dsl-groovy-samples subproject illustrates how to load a Groovy compiled DSL script. This works pretty well if your project has the Groovy compiler enabled. The DSL script may also be parsed and loaded at runtime via IntegrationBuilder.build() which accepts, in addition to a groovy.lang.Script instance, an InputStream, Spring Resource, GroovyCodeSource, File, or Script class name. The loaded script should start with a top level closure, messageFlow or doWithSpringIntegration. The return value of build() is either a MessageFlow or IntegrationContext depending on the top level closure and requires type casting if you need to reference it directly. A better alternative is to use the IntegrationBuilder to access the application context, integration context, and message flows:

IntegrationBuilder builder = new IntegrationBuilder();
builder.build(new FileSystemResource("path/to/groovy/script/si-script.groovy"));
IntegrationContext ic = builder.getIntegrationContext();

// These delegate to the IntegrationContext
MessageFlow[] flows = builder.getMessageFlows();
ApplicationContext ic = builder.getApplicationContext();

Working with the Spring Application Context

The IntegrationBuilder currently creates a new Spring ApplicationContext via one of its top level methods. If necessary, you may access the application context via IntegrationBuilder.getApplicationContext(). IntegrationBuilder also provides a constructor that accepts a parent context as an argument. If a parent context is provided, the builder context will be created as a child context, providing access to beans defined in the parent context. As mentioned in previous sections you may override automatic creation of the application context via IntegrationBuilderbuilder.setAutoCreateApplicationContext(false).