Skip to content

Commit f7b3122

Browse files
fbalicchiaartembilan
authored andcommitted
INTEXT-215: Cassandra Namespace Support
JIRA: https://jira.spring.io/browse/INTEXT-215
1 parent 2e92477 commit f7b3122

16 files changed

+976
-68
lines changed

spring-integration-cassandra/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ ext {
4040
jacocoVersion = '0.7.2.201409121644'
4141
slf4jVersion = '1.7.12'
4242
springDataCassandraVersion = '1.3.0.RELEASE'
43-
springIntegrationVersion = '4.2.0.RELEASE'
43+
springIntegrationVersion = '4.2.4.RELEASE'
4444

4545
idPrefix = 'cassandra'
4646

spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraNamespaceHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015 the original author or authors
2+
* Copyright 2015-2016 the original author or authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,12 +20,14 @@
2020

2121
/**
2222
* @author Artem Bilan
23+
* @author Filippo Balicchia
2324
*/
2425
public class CassandraNamespaceHandler extends AbstractIntegrationNamespaceHandler {
2526

2627
@Override
2728
public void init() {
28-
29+
registerBeanDefinitionParser("outbound-channel-adapter", new CassandraOutboundChannelAdapterParser());
30+
registerBeanDefinitionParser("outbound-gateway", new CassandraOutboundGatewayParser());
2931
}
3032

3133
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cassandra.config.xml;
18+
19+
import org.w3c.dom.Element;
20+
21+
import org.springframework.beans.factory.support.AbstractBeanDefinition;
22+
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
23+
import org.springframework.beans.factory.xml.ParserContext;
24+
import org.springframework.integration.cassandra.outbound.CassandraMessageHandler;
25+
import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser;
26+
27+
/**
28+
* @author Filippo Balicchia
29+
*/
30+
public class CassandraOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser {
31+
32+
@Override
33+
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
34+
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(CassandraMessageHandler.class);
35+
builder.addPropertyValue("producesReply", false);
36+
CassandraParserUtils.processOutboundTypeAttributes(element, parserContext, builder);
37+
return builder.getBeanDefinition();
38+
}
39+
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2016 the original author or authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cassandra.config.xml;
18+
19+
import org.w3c.dom.Element;
20+
21+
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
22+
import org.springframework.beans.factory.xml.ParserContext;
23+
import org.springframework.integration.cassandra.outbound.CassandraMessageHandler;
24+
import org.springframework.integration.config.xml.AbstractConsumerEndpointParser;
25+
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
26+
27+
/**
28+
* @author Filippo Balicchia
29+
*/
30+
public class CassandraOutboundGatewayParser extends AbstractConsumerEndpointParser {
31+
32+
33+
@Override
34+
protected String getInputChannelAttributeName() {
35+
return "request-channel";
36+
}
37+
38+
@Override
39+
protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) {
40+
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(CassandraMessageHandler.class);
41+
builder.addPropertyValue("producesReply", true);
42+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "reply-channel", "outputChannel");
43+
CassandraParserUtils.processOutboundTypeAttributes(element, parserContext, builder);
44+
return builder;
45+
}
46+
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cassandra.config.xml;
18+
19+
import java.util.List;
20+
21+
import org.w3c.dom.Element;
22+
23+
import org.springframework.beans.factory.config.BeanDefinition;
24+
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
25+
import org.springframework.beans.factory.support.ManagedMap;
26+
import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser;
27+
import org.springframework.beans.factory.xml.ParserContext;
28+
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
29+
import org.springframework.util.CollectionUtils;
30+
import org.springframework.util.StringUtils;
31+
import org.springframework.util.xml.DomUtils;
32+
33+
/**
34+
* @author Filippo Balicchia
35+
*/
36+
public class CassandraParserUtils {
37+
38+
public static void processOutboundTypeAttributes(Element element, ParserContext parserContext,
39+
BeanDefinitionBuilder builder) {
40+
41+
String cassandraTemplate = element.getAttribute("cassandra-template");
42+
String mode = element.getAttribute("mode");
43+
String ingestQuery = element.getAttribute("ingest-query");
44+
String query = element.getAttribute("query");
45+
46+
if (StringUtils.isEmpty(cassandraTemplate)) {
47+
parserContext.getReaderContext().error("cassandra-template is required", element);
48+
}
49+
50+
builder.addConstructorArgReference(cassandraTemplate);
51+
if (!StringUtils.isEmpty(mode)) {
52+
builder.addConstructorArgValue(mode);
53+
}
54+
55+
BeanDefinition statementExpressionDef = IntegrationNamespaceUtils
56+
.createExpressionDefIfAttributeDefined("statement-expression", element);
57+
58+
if (statementExpressionDef != null) {
59+
builder.addPropertyValue("statementExpression", statementExpressionDef);
60+
}
61+
62+
if (!areMutuallyExclusive(query, statementExpressionDef, ingestQuery)) {
63+
parserContext.getReaderContext()
64+
.error("'query', 'ingest-query', 'statement-expression' are mutually exclusive", element);
65+
}
66+
67+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "write-options");
68+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "ingest-query");
69+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "query");
70+
71+
List<Element> parameterExpressions = DomUtils.getChildElementsByTagName(element, "parameter-expression");
72+
if (!CollectionUtils.isEmpty(parameterExpressions)) {
73+
ManagedMap<String, Object> parameterExpressionsMap = new ManagedMap<String, Object>();
74+
for (Element parameterExpressionElement : parameterExpressions) {
75+
String name = parameterExpressionElement.getAttribute(AbstractBeanDefinitionParser.NAME_ATTRIBUTE);
76+
BeanDefinition expression = IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined(
77+
IntegrationNamespaceUtils.EXPRESSION_ATTRIBUTE, parameterExpressionElement);
78+
if (expression != null) {
79+
parameterExpressionsMap.put(name, expression);
80+
}
81+
82+
}
83+
builder.addPropertyValue("parameterExpressions", parameterExpressionsMap);
84+
}
85+
86+
}
87+
88+
public static boolean areMutuallyExclusive(String query, BeanDefinition statementExpressionDef,
89+
String ingestQuery) {
90+
return StringUtils.isEmpty(query) && statementExpressionDef == null && StringUtils.isEmpty(ingestQuery)
91+
|| !(StringUtils.hasText(query) && statementExpressionDef != null && StringUtils.hasText(ingestQuery))
92+
&& (StringUtils.hasText(query) ^ statementExpressionDef != null) ^ StringUtils.hasText(ingestQuery);
93+
}
94+
95+
}

spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015 the original author or authors.
2+
* Copyright 2015-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,6 +44,7 @@
4444
/**
4545
* @author Soby Chacko
4646
* @author Artem Bilan
47+
* @author Filippo Balicchia
4748
*/
4849
@SuppressWarnings("unchecked")
4950
public class CassandraMessageHandler<T> extends AbstractReplyProducingMessageHandler {
@@ -52,7 +53,7 @@ public class CassandraMessageHandler<T> extends AbstractReplyProducingMessageHan
5253

5354
private final CassandraOperations cassandraTemplate;
5455

55-
private Type queryType;
56+
private Type mode;
5657

5758
private boolean producesReply;
5859

@@ -78,13 +79,13 @@ public CassandraMessageHandler(CassandraOperations cassandraTemplate, CassandraM
7879
Assert.notNull(cassandraTemplate, "'cassandraTemplate' must not be null.");
7980
Assert.notNull(queryType, "'queryType' must not be null.");
8081
this.cassandraTemplate = cassandraTemplate;
81-
this.queryType = queryType;
82+
this.mode = queryType;
8283
}
8384

8485
public void setIngestQuery(String ingestQuery) {
8586
Assert.hasText(ingestQuery, "'ingestQuery' must not be empty");
8687
this.ingestQuery = ingestQuery;
87-
this.queryType = Type.INSERT;
88+
this.mode = Type.INSERT;
8889
}
8990

9091
public void setWriteOptions(WriteOptions writeOptions) {
@@ -147,7 +148,7 @@ public void setParameterExpressions(Map<String, Expression> parameterExpressions
147148
public void setStatementProcessor(MessageProcessor<Statement> statementProcessor) {
148149
Assert.notNull(statementProcessor, "'statementProcessor' must not be null.");
149150
this.statementProcessor = statementProcessor;
150-
this.queryType = Type.STATEMENT;
151+
this.mode = Type.STATEMENT;
151152
}
152153

153154
@Override
@@ -175,16 +176,16 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
175176

176177
Object result = payload;
177178

178-
Type queryType = this.queryType;
179+
Type mode = this.mode;
179180

180181
Statement statement = null;
181182

182183
if (payload instanceof Statement) {
183184
statement = (Statement) payload;
184-
queryType = Type.STATEMENT;
185+
mode = Type.STATEMENT;
185186
}
186187

187-
switch (queryType) {
188+
switch (mode) {
188189
case INSERT:
189190
if (this.ingestQuery != null) {
190191
Assert.isInstanceOf(List.class, payload,

0 commit comments

Comments
 (0)