forked from infinispan/infinispan
-
Notifications
You must be signed in to change notification settings - Fork 1
/
JGroupsTransportTest.java
126 lines (112 loc) · 6.28 KB
/
JGroupsTransportTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package org.infinispan.remoting.transport.jgroups;
import static org.testng.AssertJUnit.assertEquals;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.InboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.ByteString;
import org.infinispan.xsite.XSiteReplicateCommand;
import org.jgroups.util.UUID;
import org.testng.annotations.Test;
/**
* @author Dan Berindei
* @since 9.0
*/
@Test
public class JGroupsTransportTest extends MultipleCacheManagersTest {
public static final ByteString CACHE_NAME = ByteString.fromString("cache");
@Override
protected void createCacheManagers() throws Throwable {
ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
configurationBuilder.clustering().cacheMode(CacheMode.REPL_SYNC);
createCluster(configurationBuilder, 2);
}
public void testSynchronousIgnoreLeaversInvocationToNonMembers() throws Exception {
UUID randomUuid = UUID.randomUUID();
Address randomAddress = JGroupsAddressCache.fromJGroupsAddress(randomUuid);
JGroupsTransport transport = (JGroupsTransport) manager(0).getTransport();
long initialMessages = transport.getChannel().getSentMessages();
ReplicableCommand command = new ClusteredGetCommand("key", CACHE_NAME, 0);
CompletableFuture<Map<Address, Response>> future = transport
.invokeRemotelyAsync(Collections.singletonList(randomAddress), command,
ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, 1, null, DeliverOrder.NONE, true);
assertEquals(CacheNotFoundResponse.INSTANCE, future.get().get(randomAddress));
assertEquals(initialMessages, transport.getChannel().getSentMessages());
}
public void testInvokeCommandStaggeredToNonMember() throws Exception {
UUID randomUuid = UUID.randomUUID();
Address randomAddress = JGroupsAddressCache.fromJGroupsAddress(randomUuid);
// Send message only to non-member
JGroupsTransport transport = (JGroupsTransport) manager(0).getTransport();
ReplicableCommand command = new ClusteredGetCommand("key", ByteString.fromString("cache"), 0);
CompletionStage<Map<Address, Response>> future =
transport.invokeCommandStaggered(Collections.singletonList(randomAddress), command,
MapResponseCollector.ignoreLeavers(), DeliverOrder.NONE, 5,
TimeUnit.SECONDS);
assertEquals(Collections.singletonMap(randomAddress, CacheNotFoundResponse.INSTANCE),
future.toCompletableFuture().get());
// Send message to view member that doesn't have the cache and to non-member
CompletionStage<Map<Address, Response>> future2 =
transport.invokeCommandStaggered(Arrays.asList(address(1), randomAddress), command,
MapResponseCollector.ignoreLeavers(), DeliverOrder.NONE, 5,
TimeUnit.SECONDS);
Map<Object, Object> expected = TestingUtil.mapOf(address(1), CacheNotFoundResponse.INSTANCE,
randomAddress, CacheNotFoundResponse.INSTANCE);
assertEquals(expected, future2.toCompletableFuture().get());
// Send message to view member that doesn't have the cache and to non-member
// and block the response from the view member
CompletableFuture<Void> blocker = blockRemoteGets();
try {
CompletionStage<Map<Address, Response>> future3 =
transport.invokeCommandStaggered(Arrays.asList(address(1), randomAddress), command,
MapResponseCollector.ignoreLeavers(), DeliverOrder.NONE, 5,
TimeUnit.SECONDS);
// Wait for the stagger timeout (5s / 10 / 2) to expire before sending a reply back
Thread.sleep(500);
blocker.complete(null);
assertEquals(expected, future3.toCompletableFuture().get());
} finally {
blocker.complete(null);
}
}
private CompletableFuture<Void> blockRemoteGets() {
CompletableFuture<Void> blocker = new CompletableFuture<>();
InboundInvocationHandler oldInvocationHandler = TestingUtil.extractGlobalComponent(manager(1),
InboundInvocationHandler
.class);
InboundInvocationHandler blockingInvocationHandler = new InboundInvocationHandler() {
@Override
public void handleFromCluster(Address origin, ReplicableCommand command, Reply reply, DeliverOrder order) {
if (command instanceof ClusteredGetCommand) {
log.tracef("Blocking clustered get");
blocker.thenRun(() -> oldInvocationHandler.handleFromCluster(origin, command, reply, order));
} else {
oldInvocationHandler.handleFromCluster(origin, command, reply, order);
}
}
@Override
public void handleFromRemoteSite(String origin, XSiteReplicateCommand command, Reply reply,
DeliverOrder order) {
oldInvocationHandler.handleFromRemoteSite(origin, command, reply, order);
}
};
TestingUtil.replaceComponent(manager(1), InboundInvocationHandler.class, blockingInvocationHandler, true);
return blocker;
}
}