Skip to content

Commit

Permalink
ASR-15: Add Aggregator Application Starter
Browse files Browse the repository at this point in the history
Fixes spring-attic/app-starters-release#15

* Add `EnvironmentPostProcessor` to perform `spring.autoconfigure.exclude`
* Add more properties classes from Spring Boot to the `spring-configuration-metadata-whitelist.properties`

Address PR comments
  • Loading branch information
artembilan authored and garyrussell committed Feb 10, 2017
1 parent 8518ac6 commit e918f8a
Show file tree
Hide file tree
Showing 12 changed files with 920 additions and 22 deletions.
35 changes: 27 additions & 8 deletions aggregator-app-dependencies/pom.xml
@@ -1,12 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>aggregator-app-dependencies</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<parent>
<artifactId>spring-cloud-dependencies-parent</artifactId>
<groupId>org.springframework.cloud</groupId>
<version>1.2.1.RELEASE</version>
<relativePath/>
</parent>


</project>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>aggregator-app-dependencies</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>
<name>aggregator-app-dependencies</name>
<description>Spring Cloud Stream Aggregator App Dependencies</description>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-aggregator</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>

</project>
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>app-starters-build</artifactId>
<version>1.1.2.BUILD-SNAPSHOT</version>
<version>1.1.3.BUILD-SNAPSHOT</version>
<relativePath/>
</parent>

Expand Down
57 changes: 57 additions & 0 deletions spring-cloud-starter-stream-aggregator/README.adoc
@@ -0,0 +1,57 @@
//tag::ref-doc[]
= Aggregator Processor

Use the `aggregator` application to combine multiple messages into one, based on some correlation mechanism.

This processor is fully based on the Aggregator component from http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#aggregator[Spring Integration].
So, please, consult there for use-cases and functionality.

== Options

The **$$aggregator$$** $$processor$$ has the following options:

//tag::configuration-properties[]
$$aggregator.correlation$$:: $$SpEL expression for correlation key$$ *($$String$$, default: `correlationId` header)*
$$aggregator.release$$:: $$SpEL expression for release strategy$$ *($$String$$, default: group size == sequenceSize header.)*
$$aggregator.aggregation$$:: $$SpEL expression for aggregation strategy$$ *($$String$$, default: collection of payloads)*
$$aggregator.groupTimeout$$:: $$SpEL expression for a timeout to expire uncompleted groups$$ *($$String$$, default: none)*
$$aggregator.messageStoreType$$:: $$Message store type.
Possible values are: simple, jdbc, redis, mongodb, gemfire $$ *($$String$$, default: `simple`)*
$$aggregator.messageStoreEntity$$:: $$Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc$$ *($$String$$, default: none)*
//end::configuration-properties[]

By default the `aggregator` processor uses:
- `HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)` - for `correlation`;
- `SequenceSizeReleaseStrategy` - for `release`;
- `DefaultAggregatingMessageGroupProcessor` - for `aggregation`;
- `SimpleMessageStore` - for `messageStoreType`.

The `aggregator` application can be configured for persistent `MessageGroupStore` http://docs.spring.io/spring-integration/reference/html/system-management-chapter.html#message-store[implementations].
The configuration for target technology is fully based on the Spring Boot auto-configuration.
But default JDBC, MongoDb and Redis auto-configurations are excluded.
They are `@Import` ed basing on the `aggregator.messageStoreType` configuration property.
Consult Spring Boot http://docs.spring.io/spring-boot/docs/current/reference/html/[Reference Manual] for auto-configuration for particular technology you use for `aggregator`.

The JDBC `JdbcMessageStore` requires particular tables in the target data base.
You can find schema scripts for appropriate RDBMS vendors in the `org.springframework.integration.jdbc` package of the `spring-integration-jdbc` jar.
Those scripts can be used for automatic data base initialization via Spring Boot.

For example:
```
java -jar aggregator-rabbit-1.0.0.RELEASE
--aggregator.message-store-type=jdbc
--spring.datasource.url=jdbc:h2:mem:test
--spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql
```

//end::ref-doc[]
== Building with Maven

```
$> mvn package
```

== Code of Conduct
This project adheres to the Contributor Covenant link:CODE_OF_CONDUCT.adoc[code of conduct].
By participating, you are expected to uphold this code.
Please report unacceptable behavior to spring-code-of-conduct@pivotal.io.
113 changes: 100 additions & 13 deletions spring-cloud-starter-stream-aggregator/pom.xml
@@ -1,19 +1,106 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>header-enricher-app-starters-build</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<parent>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>aggregator-app-starters-build</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>

<artifactId>spring-cloud-starter-stream-aggregator</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-starter-stream-aggregator</name>
<description>Spring Cloud Stream Aggregator Starter</description>
<artifactId>spring-cloud-starter-stream-aggregator</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-starter-stream-aggregator</name>
<description>Spring Cloud Stream Aggregator Starter</description>

<dependencies>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<scope>test</scope>
</dependency>

</project>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-gemfire</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-app-starter-doc-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.springframework.cloud.stream.app.plugin</groupId>
<artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
<configuration>
<generatedProjectHome>${session.executionRootDirectory}/apps</generatedProjectHome>
<generatedProjectVersion>${project.version}</generatedProjectVersion>
<bom>
<name>scs-bom</name>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>aggregator-app-dependencies</artifactId>
<version>${project.version}</version>
</bom>
<generatedApps>
<aggregator/>
</generatedApps>
</configuration>
</plugin>
</plugins>
</build>

</project>
@@ -0,0 +1,117 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.app.aggregator;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.aggregator.CorrelationStrategy;
import org.springframework.integration.aggregator.DefaultAggregatingMessageGroupProcessor;
import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy;
import org.springframework.integration.aggregator.ExpressionEvaluatingMessageGroupProcessor;
import org.springframework.integration.aggregator.ExpressionEvaluatingReleaseStrategy;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.store.MessageGroupStore;

/**
* A Processor app that performs aggregation.
*
* @author Artem Bilan
*/
@EnableBinding(Processor.class)
@EnableConfigurationProperties(AggregatorProperties.class)
public class AggregatorConfiguration {

@Autowired
private AggregatorProperties properties;

@Autowired
private BeanFactory beanFactory;

@Bean
@ServiceActivator(inputChannel = Processor.INPUT)
public AggregatorFactoryBean aggregator(
ObjectProvider<CorrelationStrategy> correlationStrategy,
ObjectProvider<ReleaseStrategy> releaseStrategy,
ObjectProvider<MessageGroupProcessor> messageGroupProcessor,
ObjectProvider<MessageGroupStore> messageStore) {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setOutputChannelName(Processor.OUTPUT);
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setSendPartialResultOnExpiry(true);
aggregator.setGroupTimeoutExpression(this.properties.getGroupTimeout());

aggregator.setCorrelationStrategy(correlationStrategy.getIfAvailable());
aggregator.setReleaseStrategy(releaseStrategy.getIfAvailable());


MessageGroupProcessor groupProcessor = messageGroupProcessor.getIfAvailable();

if (groupProcessor == null) {
groupProcessor = new DefaultAggregatingMessageGroupProcessor();
((BeanFactoryAware) groupProcessor).setBeanFactory(this.beanFactory);
}
aggregator.setProcessorBean(groupProcessor);

aggregator.setMessageStore(messageStore.getIfAvailable());

return aggregator;
}

@Bean
@ConditionalOnProperty(prefix = AggregatorProperties.PREFIX, name = "correlation")
@ConditionalOnMissingBean
public CorrelationStrategy correlationStrategy() {
return new ExpressionEvaluatingCorrelationStrategy(this.properties.getCorrelation());
}

@Bean
@ConditionalOnProperty(prefix = AggregatorProperties.PREFIX, name = "release")
@ConditionalOnMissingBean
public ReleaseStrategy releaseStrategy() {
return new ExpressionEvaluatingReleaseStrategy(this.properties.getRelease().getExpressionString());
}

@Bean
@ConditionalOnProperty(prefix = AggregatorProperties.PREFIX, name = "aggregation")
@ConditionalOnMissingBean
public MessageGroupProcessor messageGroupProcessor() {
return new ExpressionEvaluatingMessageGroupProcessor(this.properties.getAggregation().getExpressionString());
}


@Configuration
@ConditionalOnMissingBean(MessageGroupStore.class)
@Import({ MessageStoreConfiguration.Mongo.class, MessageStoreConfiguration.Redis.class,
MessageStoreConfiguration.Gemfire.class, MessageStoreConfiguration.Jdbc.class })
protected static class MessageStoreAutoConfiguration {

}

}

0 comments on commit e918f8a

Please sign in to comment.