Skip to content

Commit 894a60b

Browse files
viniciusccarvalhoartembilan
authored andcommitted
Add Hazelcast MessageStore implementation
- Hazelcast implementation of the `MessageStore` - Test case uses the same tests as gemfire plus a couple of more scenarios * Some polishing and Docs
1 parent 6474ce9 commit 894a60b

File tree

5 files changed

+239
-1
lines changed

5 files changed

+239
-1
lines changed

spring-integration-hazelcast/README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,5 +477,24 @@ public LeaderInitiator initiator() {
477477
```
478478

479479
Then when a node is elected leader it will send `OnGrantedEvent` to all application listeners. See
480-
the [Spring Integration User Guide](http://docs.spring.io/spring-integration/reference/htmlsingle/#endpoint-roles)
480+
the [Spring Integration User Guide](http://docs.spring.io/spring-integration/reference/html/#endpoint-roles)
481481
for more information on how to use those events to control messaging endpoints.
482+
483+
## HAZELCAST MESSAGE STORE
484+
485+
For distributed messaging state management, for example for persistent `QueueChannel` or tracking `Aggregator` message groups, the `HazelcastMessageStore` implementation is provided:
486+
```java
487+
@Bean
488+
public HazelcastInstance hazelcastInstance() {
489+
return Hazelcast.newHazelcastInstance();
490+
}
491+
492+
@Bean
493+
public MessageGroupStore messageStore() {
494+
return new HazelcastMessageStore(hazelcastInstance());
495+
}
496+
```
497+
498+
By default the `SPRING_INTEGRATION_MESSAGE_STORE` `IMap` is used to store messages and groups key/value manner.
499+
Any custom `IMap` can be provided to the `HazelcastMessageStore`.
500+
See [Spring Integration User Guide](http://docs.spring.io/spring-integration/reference/html/system-management-chapter.html#message-store) for more information about `MessageStore`.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides the Leader Initiator support classes.
3+
*/
4+
package org.springframework.integration.hazelcast.leader;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.hazelcast.store;
18+
19+
import java.util.Collection;
20+
21+
import org.springframework.integration.store.AbstractKeyValueMessageStore;
22+
import org.springframework.util.Assert;
23+
24+
import com.hazelcast.core.HazelcastInstance;
25+
import com.hazelcast.core.IMap;
26+
import com.hazelcast.query.SqlPredicate;
27+
28+
/**
29+
* The Hazelcast {@link IMap}-based {@link AbstractKeyValueMessageStore} implementation.
30+
*
31+
* @author Vinicius Carvalho
32+
* @author Artem Bilan
33+
*/
34+
public class HazelcastMessageStore extends AbstractKeyValueMessageStore {
35+
36+
private static final String MESSAGE_STORE_MAP_NAME = "SPRING_INTEGRATION_MESSAGE_STORE";
37+
38+
private final IMap<Object, Object> map;
39+
40+
public HazelcastMessageStore(HazelcastInstance hazelcastInstance) {
41+
Assert.notNull(hazelcastInstance, "Hazelcast instance can't be null");
42+
this.map = hazelcastInstance.getMap(MESSAGE_STORE_MAP_NAME);
43+
}
44+
45+
public HazelcastMessageStore(IMap<Object, Object> map) {
46+
Assert.notNull(map, "IMap reference can not be null");
47+
this.map = map;
48+
}
49+
50+
@Override
51+
protected Object doRetrieve(Object id) {
52+
return this.map.get(id);
53+
}
54+
55+
@Override
56+
protected void doStore(Object id, Object objectToStore) {
57+
this.map.put(id, objectToStore);
58+
}
59+
60+
@Override
61+
protected Object doRemove(Object id) {
62+
return this.map.remove(id);
63+
}
64+
65+
@Override
66+
protected Collection<?> doListKeys(String keyPattern) {
67+
Assert.hasText(keyPattern, "'keyPattern' must not be empty");
68+
keyPattern = keyPattern.replaceAll("\\*", "%");
69+
SqlPredicate sqlPredicate = new SqlPredicate("__key like " + keyPattern);
70+
return this.map.values(sqlPredicate);
71+
}
72+
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides the Message Store support classes.
3+
*/
4+
package org.springframework.integration.hazelcast.store;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.hazelcast.store;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertNotSame;
22+
import static org.junit.Assert.assertSame;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.Properties;
27+
28+
import org.junit.AfterClass;
29+
import org.junit.Before;
30+
import org.junit.BeforeClass;
31+
import org.junit.Test;
32+
33+
import org.springframework.integration.channel.DirectChannel;
34+
import org.springframework.integration.history.MessageHistory;
35+
import org.springframework.integration.store.MessageGroup;
36+
import org.springframework.integration.support.MessageBuilder;
37+
import org.springframework.integration.test.util.TestUtils;
38+
import org.springframework.messaging.Message;
39+
import org.springframework.messaging.support.GenericMessage;
40+
41+
import com.hazelcast.core.Hazelcast;
42+
import com.hazelcast.core.HazelcastInstance;
43+
import com.hazelcast.core.IMap;
44+
45+
/**
46+
* @author Vinicius Carvalho
47+
* @author Artem Bilan
48+
*/
49+
public class HazelcastMessageStoreTests {
50+
51+
private static HazelcastMessageStore store;
52+
53+
private static HazelcastInstance instance;
54+
55+
private static IMap<Object, Object> map;
56+
57+
@BeforeClass
58+
public static void init() throws Exception {
59+
instance = Hazelcast.newHazelcastInstance();
60+
map = instance.getMap("customTestsMessageStore");
61+
store = new HazelcastMessageStore(map);
62+
}
63+
64+
@AfterClass
65+
public static void destroy() throws Exception {
66+
instance.shutdown();
67+
}
68+
69+
@Before
70+
public void clean() throws Exception {
71+
map.clear();
72+
}
73+
74+
@Test
75+
public void testWithMessageHistory() throws Exception {
76+
77+
Message<?> message = new GenericMessage<>("Hello");
78+
DirectChannel fooChannel = new DirectChannel();
79+
fooChannel.setBeanName("fooChannel");
80+
DirectChannel barChannel = new DirectChannel();
81+
barChannel.setBeanName("barChannel");
82+
83+
message = MessageHistory.write(message, fooChannel);
84+
message = MessageHistory.write(message, barChannel);
85+
store.addMessage(message);
86+
message = store.getMessage(message.getHeaders().getId());
87+
MessageHistory messageHistory = MessageHistory.read(message);
88+
assertNotNull(messageHistory);
89+
assertEquals(2, messageHistory.size());
90+
Properties fooChannelHistory = messageHistory.get(0);
91+
assertEquals("fooChannel", fooChannelHistory.get("name"));
92+
assertEquals("channel", fooChannelHistory.get("type"));
93+
94+
}
95+
96+
@Test
97+
public void testAddAndRemoveMessagesFromMessageGroup() throws Exception {
98+
String groupId = "X";
99+
List<Message<?>> messages = new ArrayList<>();
100+
for (int i = 0; i < 25; i++) {
101+
Message<String> message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build();
102+
store.addMessagesToGroup(groupId, message);
103+
messages.add(message);
104+
}
105+
MessageGroup group = store.getMessageGroup(groupId);
106+
assertEquals(25, group.size());
107+
store.removeMessagesFromGroup(groupId, messages);
108+
group = store.getMessageGroup(groupId);
109+
assertEquals(0, group.size());
110+
}
111+
112+
@Test
113+
public void addAndGetMessage() throws Exception {
114+
115+
Message<?> message = MessageBuilder.withPayload("test").build();
116+
store.addMessage(message);
117+
Message<?> retrieved = store.getMessage(message.getHeaders().getId());
118+
assertEquals(message, retrieved);
119+
}
120+
121+
@Test
122+
public void customMap() throws Exception {
123+
assertSame(map, TestUtils.getPropertyValue(store, "map"));
124+
HazelcastMessageStore store2 = new HazelcastMessageStore(instance);
125+
assertNotSame(map, TestUtils.getPropertyValue(store2, "map"));
126+
}
127+
128+
@Test
129+
public void messageStoreSize() throws Exception {
130+
Message<?> message1 = MessageBuilder.withPayload("test").build();
131+
Message<?> message2 = MessageBuilder.withPayload("test").build();
132+
store.addMessage(message1);
133+
store.addMessage(message2);
134+
long size = store.getMessageCount();
135+
assertEquals(2, size);
136+
}
137+
138+
}

0 commit comments

Comments
 (0)