-
Notifications
You must be signed in to change notification settings - Fork 4
/
MultiThreadedMessageBusTest.java
110 lines (100 loc) · 4.38 KB
/
MultiThreadedMessageBusTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
* #%L
* Bitmagasin integrationstest
*
* $Id$
* $HeadURL$
* %%
* Copyright (C) 2010 The State and University Library, The Royal Library and The State Archives, Denmark
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 2.1 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Lesser Public License for more details.
*
* You should have received a copy of the GNU General Lesser Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/lgpl-2.1.html>.
* #L%
*/
package org.bitrepository.protocol.bus;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.bitrepository.bitrepositorymessages.IdentifyPillarsForGetFileRequest;
import org.bitrepository.bitrepositorymessages.Message;
import org.bitrepository.protocol.IntegrationTest;
import org.bitrepository.protocol.MessageContext;
import org.bitrepository.protocol.ProtocolComponentFactory;
import org.bitrepository.protocol.message.ExampleMessageFactory;
import org.bitrepository.protocol.messagebus.MessageListener;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
/**
* Class for testing the interface with the message bus.
*/
public class MultiThreadedMessageBusTest extends IntegrationTest {
/** The time to wait when sending a message before it definitely should
* have been consumed by a listener.*/
static final int TIME_FOR_WAIT = 2500;
private final static int threadCount = 3;
private int count = 0;
private final static String FINISH = "FINISH";
private BlockingQueue<String> finishQueue = new LinkedBlockingQueue<>(1);
MultiMessageListener listener;
@Override
protected void setupMessageBus() {
if (useEmbeddedMessageBus() && broker == null) {
broker = new LocalActiveMQBroker(settingsForTestClient.getMessageBusConfiguration());
broker.start();
}
messageBus = new MessageBusWrapper(ProtocolComponentFactory.getInstance().getMessageBus(
settingsForTestClient, securityManager), testEventManager);
}
@Test(groups = { "regressiontest" })
public final void manyTheadsBeforeFinish() throws Exception {
addDescription("Tests whether it is possible to start the handling of many threads simultaneously.");
IdentifyPillarsForGetFileRequest content =
ExampleMessageFactory.createMessage(IdentifyPillarsForGetFileRequest.class);
listener = new MultiMessageListener();
messageBus.addListener("BusActivityTest", listener);
content.setDestination("BusActivityTest");
addStep("Send one message for each listener", "When all have receiver, then they give respond on 'finishQueue'");
for(int i = 0; i < threadCount; i++) {
messageBus.sendMessage(content);
}
Assert.assertEquals(finishQueue.poll(TIME_FOR_WAIT, TimeUnit.MILLISECONDS), FINISH);
Assert.assertEquals(count, threadCount);
}
@AfterMethod(alwaysRun = true)
public void removeListener() {
messageBus.removeListener("BusActivityTest", listener);
}
protected class MultiMessageListener implements MessageListener {
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(threadCount);
@Override
public final void onMessage(Message message, MessageContext messageContext) {
try {
testIfFinished();
Assert.assertNotNull(queue.poll(TIME_FOR_WAIT, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
Assert.fail("Should not throw an exception: ", e);
}
}
private void testIfFinished() throws InterruptedException {
count++;
if(count >= threadCount) {
for(int i = 0; i < threadCount; i++) {
queue.put("Count '" + i + "'");
}
finishQueue.put(FINISH);
}
}
}
}