Skip to content

Commit 573e1f3

Browse files
erenavsarogullariartembilan
authored andcommitted
INTEXT-167: Add Cluster Monitor Channel Adapter
JIRA: https://jira.spring.io/browse/INTEXT-167 Some refactorings are done. Hazelcast Cluster Monitor Unit Test Refactorings are done. Some minor bug fixes are done. Redundant assert is removed. HazelcastInstance lifecycle check is added Migration Event UT Case is updated. Code style polishing and upgrade to the `spring.io.plugin-0.4.0`
1 parent 5c136e4 commit 573e1f3

File tree

13 files changed

+854
-22
lines changed

13 files changed

+854
-22
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ The Spring Integration Extensions project provides extension modules for [Spring
66
## Available Modules
77

88
* [Amazon Web Services (AWS)][] Support
9+
* [Hazelcast][] Support
910
* [Kafka][] Support
1011
* [MQ Telemetry Transport (MQTT)][] Support
1112
* [Print][] Support
@@ -139,6 +140,7 @@ The Spring Integration Extensions Framework is released under version 2.0 of the
139140
[MQ Telemetry Transport]: http://mqtt.org/
140141
[Websockets]: http://www.html5rocks.com/en/tutorials/websockets/basics/
141142
[XQuery]: http://en.wikipedia.org/wiki/XQuery
142-
[Splunk]:http://www.splunk.com/
143+
[Splunk]: http://www.splunk.com/
143144
[Amazon Web Services (AWS)]: http://aws.amazon.com/
144145
[MQ Telemetry Transport (MQTT)]: http://mqtt.org/
146+
[Hazelcast]: http://hazelcast.org/

spring-integration-hazelcast/README.md

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ Hazelcast Event-Driven Inbound Channel Adapter listens related cache events and
2828
```
2929
Basically, Hazelcast Event-Driven Inbound Channel Adapter requires following attributes :
3030

31-
* **channel :** Specifies channel which message is sent. It is mandatory attribute.
31+
* **channel :** Specifies channel which message is sent.
3232
* **cache :** Specifies the distributed Object reference which is listened. It is mandatory attribute.
3333
* **cache-events :** Specifies cache events which are listened. It is optional attribute and its default value is ADDED. Its supported values are as follows :
3434

@@ -155,7 +155,7 @@ Sample definitions are as follows :
155155

156156
## HAZELCAST CONTINUOUS QUERY INBOUND CHANNEL ADAPTER
157157

158-
Hazelcast Continuous Query enables to listen to the modifications performed on specific map entries. Hazelcast Continuous Query Inbound Channel Adapter is an event-driven inbound channel adapter and listens related distributed map events in the light of defined predicate. Its basic definition is as follows :
158+
Hazelcast Continuous Query enables to listen to the modifications performed on specific map entries. Hazelcast Continuous Query Inbound Channel Adapter is an event-driven channel adapter and listens to related distributed map events in the light of defined predicate. Its basic definition is as follows :
159159
```
160160
<int-hazelcast:cq-inbound-channel-adapter
161161
channel="cqMapChannel"
@@ -167,7 +167,7 @@ Hazelcast Continuous Query enables to listen to the modifications performed on s
167167
```
168168
Basically, it requires four attributes as follows :
169169

170-
* **channel :** Specifies channel which message is sent. It is mandatory attribute.
170+
* **channel :** Specifies channel which message is sent.
171171
* **cache :** Specifies distributed Map reference which is listened. It is mandatory attribute.
172172
* **cache-events :** Specifies cache events which are listened. It is optional attribute with ADDED default value. Supported values are ADDED, REMOVED, UPDATED, EVICTED, EVICT_ALL and CLEAR_ALL.
173173
* **predicate :** Specifies predicate to listen to the modifications performed on specific map entries. It is mandatory attribute.
@@ -200,6 +200,40 @@ Sample definition is as follows :
200200
**Reference :** http://docs.hazelcast.org/docs/3.4/manual/html-single/hazelcast-documentation.html#continuous-query
201201

202202

203+
## HAZELCAST CLUSTER MONITOR INBOUND CHANNEL ADAPTER
204+
205+
Hazelcast Cluster Monitor enables to listen to the modifications performed on cluster. Hazelcast Cluster Monitor Inbound Channel Adapter is an event-driven channel adapter and listens to related Membership, Distributed Object, Migration, Lifecycle and Client events. Its basic definition is as follows :
206+
```
207+
<int-hazelcast:cm-inbound-channel-adapter
208+
channel="monitorChannel"
209+
hazelcast-instance="instance"
210+
monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT, MIGRATION, LIFECYCLE, CLIENT" />
211+
```
212+
Basically, it requires four attributes as follows :
213+
214+
* **channel :** Specifies channel which message is sent.
215+
* **hazelcast-instance :** Specifies Hazelcast Instance reference to listen cluster events. It is mandatory attribute.
216+
* **monitor-types :** Specifies monitor types which are listened. It is optional attribute with MEMBERSHIP default value. Supported values are MEMBERSHIP, DISTRIBUTED_OBJECT, MIGRATION, LIFECYCLE, CLIENT.
217+
218+
Sample definition is as follows :
219+
```
220+
<int:channel id="monitorChannel"/>
221+
222+
<int-hazelcast:cm-inbound-channel-adapter
223+
channel="monitorChannel"
224+
hazelcast-instance="instance"
225+
monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />
226+
227+
<bean id="instance" class="com.hazelcast.core.Hazelcast"
228+
factory-method="newHazelcastInstance">
229+
<constructor-arg>
230+
<bean class="com.hazelcast.config.Config" />
231+
</constructor-arg>
232+
</bean>
233+
```
234+
**Reference :** http://docs.hazelcast.org/docs/latest/manual/html/distributedevents.html
235+
236+
203237
## HAZELCAST DISTRIBUTED-SQL INBOUND CHANNEL ADAPTER
204238

205239
Hazelcast allows to run distributed queries on the distributed map. Hazelcast Distributed SQL Inbound Channel Adapter is a poller-driven inbound channel adapter. It runs defined distributed-sql and returns results in the light of iteration type. Its basic definition is as follows :

spring-integration-hazelcast/build.gradle

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ buildscript {
1212
maven { url 'http://repo.spring.io/plugins-release' }
1313
}
1414
dependencies {
15-
classpath 'org.springframework.build.gradle:spring-io-plugin:0.0.3.RELEASE'
15+
classpath 'io.spring.gradle:spring-io-plugin:0.0.4.RELEASE'
1616
}
1717
}
1818

@@ -26,8 +26,12 @@ repositories {
2626
if (project.hasProperty('platformVersion')) {
2727
apply plugin: 'spring-io'
2828

29-
dependencies {
30-
springIoVersions "io.spring.platform:platform-bom:${platformVersion}@properties"
29+
dependencyManagement {
30+
springIoTestRuntime {
31+
imports {
32+
mavenBom "io.spring.platform:platform-bom:${platformVersion}"
33+
}
34+
}
3135
}
3236
}
3337

@@ -67,6 +71,8 @@ dependencies {
6771
compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion"
6872
compile "com.hazelcast:hazelcast:$hazelcastVersion"
6973

74+
testCompile "com.hazelcast:hazelcast-client:$hazelcastVersion"
75+
7076
testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion"
7177

7278
testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.hazelcast;
18+
19+
/**
20+
* Enumeration of Hazelcast Cluster Monitor Types
21+
*
22+
* @author Eren Avsarogullari
23+
* @since 1.0.0
24+
* @see org.springframework.integration.hazelcast.inbound.HazelcastClusterMonitorMessageProducer
25+
* @see com.hazelcast.core.MembershipListener
26+
* @see com.hazelcast.core.DistributedObjectListener
27+
* @see com.hazelcast.core.MigrationListener
28+
* @see com.hazelcast.core.LifecycleListener
29+
* @see com.hazelcast.core.ClientListener
30+
*/
31+
public enum ClusterMonitorType {
32+
33+
MEMBERSHIP, DISTRIBUTED_OBJECT, MIGRATION, LIFECYCLE, CLIENT;
34+
35+
}

spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastIntegrationDefinitionValidator.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@
4040
*/
4141
public class HazelcastIntegrationDefinitionValidator {
4242

43-
public static <E extends Enum<E>> void validateEnumType(final Class<E> enumType, final String cacheEventTypes) {
44-
Set<String> eventTypeSet = StringUtils.commaDelimitedListToSet(cacheEventTypes);
45-
for (String eventType : eventTypeSet) {
46-
Enum.valueOf(enumType, eventType);
43+
public static <E extends Enum<E>> Set<String> validateEnumType(final Class<E> enumType, final String types) {
44+
Set<String> typeSet = StringUtils.commaDelimitedListToSet(types);
45+
for (String type : typeSet) {
46+
Enum.valueOf(enumType, type);
4747
}
48+
49+
return typeSet;
4850
}
4951

5052
public static void validateCacheTypeForEventDrivenMessageProducer(final DistributedObject distributedObject) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.hazelcast.config.xml;
18+
19+
import org.w3c.dom.Element;
20+
21+
import org.springframework.beans.factory.BeanDefinitionStoreException;
22+
import org.springframework.beans.factory.support.AbstractBeanDefinition;
23+
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
24+
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
25+
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser;
26+
import org.springframework.beans.factory.xml.ParserContext;
27+
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
28+
import org.springframework.integration.hazelcast.inbound.HazelcastClusterMonitorMessageProducer;
29+
30+
import org.springframework.util.StringUtils;
31+
32+
/**
33+
* Parser for the {@code <int-hazelcast:cm-inbound-channel-adapter />} component.
34+
*
35+
* @author Eren Avsarogullari
36+
* @since 1.0.0
37+
*/
38+
public class HazelcastClusterMonitorInboundChannelAdapterParser extends
39+
AbstractSingleBeanDefinitionParser {
40+
41+
private static final String CHANNEL_ATTRIBUTE = "channel";
42+
43+
private static final String HAZELCAST_INSTANCE_ATTRIBUTE = "hazelcast-instance";
44+
45+
private static final String MONITOR_TYPES_ATTRIBUTE = "monitor-types";
46+
47+
private static final String OUTPUT_CHANNEL = "outputChannel";
48+
49+
private static final String MONITOR_EVENT_TYPES = "monitorEventTypes";
50+
51+
@Override
52+
protected Class<?> getBeanClass(Element element) {
53+
return HazelcastClusterMonitorMessageProducer.class;
54+
}
55+
56+
@Override
57+
protected String resolveId(Element element, AbstractBeanDefinition definition,
58+
ParserContext parserContext) throws BeanDefinitionStoreException {
59+
String id = super.resolveId(element, definition, parserContext);
60+
61+
if (!element.hasAttribute(CHANNEL_ATTRIBUTE)) {
62+
id = id + ".adapter";
63+
}
64+
65+
if (!StringUtils.hasText(id)) {
66+
id = BeanDefinitionReaderUtils.generateBeanName(definition,
67+
parserContext.getRegistry());
68+
}
69+
70+
return id;
71+
}
72+
73+
@Override
74+
protected void doParse(Element element, ParserContext parserContext,
75+
BeanDefinitionBuilder builder) {
76+
String channelName = element.getAttribute(CHANNEL_ATTRIBUTE);
77+
if (!StringUtils.hasText(channelName)) {
78+
channelName = IntegrationNamespaceUtils.createDirectChannel(element,
79+
parserContext);
80+
}
81+
82+
if (!StringUtils.hasText(element.getAttribute(HAZELCAST_INSTANCE_ATTRIBUTE))) {
83+
parserContext.getReaderContext().error(
84+
"'" + HAZELCAST_INSTANCE_ATTRIBUTE + "' attribute is required.",
85+
element);
86+
}
87+
88+
builder.addPropertyReference(OUTPUT_CHANNEL, channelName);
89+
builder.addConstructorArgReference(element
90+
.getAttribute(HAZELCAST_INSTANCE_ATTRIBUTE));
91+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
92+
MONITOR_TYPES_ATTRIBUTE, MONITOR_EVENT_TYPES);
93+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
94+
IntegrationNamespaceUtils.AUTO_STARTUP);
95+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
96+
IntegrationNamespaceUtils.PHASE);
97+
}
98+
99+
}

spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/xml/HazelcastIntegrationNamespaceHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public void init() {
3232
registerBeanDefinitionParser("outbound-channel-adapter", new HazelcastOutboundChannelAdapterParser());
3333
registerBeanDefinitionParser("cq-inbound-channel-adapter", new HazelcastContinuousQueryInboundChannelAdapterParser());
3434
registerBeanDefinitionParser("ds-inbound-channel-adapter", new HazelcastDistributedSQLInboundChannelAdapterParser());
35+
registerBeanDefinitionParser("cm-inbound-channel-adapter", new HazelcastClusterMonitorInboundChannelAdapterParser());
3536
}
3637

3738
}

spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/AbstractHazelcastMessageProducer.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.springframework.integration.hazelcast.message.EntryEventMessagePayload;
3434
import org.springframework.messaging.Message;
3535
import org.springframework.util.Assert;
36-
import org.springframework.util.StringUtils;
3736

3837
import com.hazelcast.core.AbstractIMapEvent;
3938
import com.hazelcast.core.DistributedObject;
@@ -71,11 +70,11 @@ protected Set<String> getCacheEvents() {
7170
}
7271

7372
public void setCacheEventTypes(String cacheEventTypes) {
74-
HazelcastIntegrationDefinitionValidator.validateEnumType(CacheEventType.class, cacheEventTypes);
75-
Set<String> cacheEvents = StringUtils.commaDelimitedListToSet(cacheEventTypes);
73+
Set<String> cacheEvents =
74+
HazelcastIntegrationDefinitionValidator.validateEnumType(CacheEventType.class, cacheEventTypes);
7675
Assert.notEmpty(cacheEvents, "'cacheEvents' must have elements");
77-
HazelcastIntegrationDefinitionValidator.validateCacheEventsByDistributedObject(
78-
this.distributedObject, cacheEvents);
76+
HazelcastIntegrationDefinitionValidator.validateCacheEventsByDistributedObject(this.distributedObject,
77+
cacheEvents);
7978
this.cacheEvents = cacheEvents;
8079
}
8180

@@ -103,7 +102,7 @@ protected abstract class AbstractHazelcastEventListener<E> {
103102
protected abstract Message<?> toMessage(E event);
104103

105104
protected void sendMessage(E event, InetSocketAddress socketAddress,
106-
CacheListeningPolicyType cacheListeningPolicyType) {
105+
CacheListeningPolicyType cacheListeningPolicyType) {
107106
if (CacheListeningPolicyType.ALL == cacheListeningPolicyType || isEventAcceptable(socketAddress)) {
108107
AbstractHazelcastMessageProducer.this.sendMessage(toMessage(event));
109108
}
@@ -178,12 +177,11 @@ public void mapCleared(MapEvent event) {
178177
@Override
179178
protected void processEvent(AbstractIMapEvent event) {
180179
if (getCacheEvents().contains(event.getEventType().toString())) {
180+
if (logger.isDebugEnabled()) {
181+
logger.debug("Received Event : " + event);
182+
}
181183
sendMessage(event, event.getMember().getSocketAddress(), getCacheListeningPolicy());
182184
}
183-
184-
if (logger.isDebugEnabled()) {
185-
logger.debug("Received Event : " + event);
186-
}
187185
}
188186

189187
@Override

0 commit comments

Comments
 (0)