-
Notifications
You must be signed in to change notification settings - Fork 912
/
JournalFlushInterruptTest.java
109 lines (90 loc) · 4.6 KB
/
JournalFlushInterruptTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.interrupt;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JournalFlushInterruptTest extends SoakTestBase {
public static final String SERVER_NAME_0 = "interruptjf";
@BeforeClass
public static void createServers() throws Exception {
File server0Location = getFileServerLocation(SERVER_NAME_0);
deleteDirectory(server0Location);
{
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setRole("amq").setUser("artemis").setPassword("artemis").setAllowAnonymous(true).setNoWeb(false).setArtemisInstance(server0Location).
setConfiguration("./src/main/resources/servers/interruptjf");
cliCreateServer.setArgs("--java-options", "-Djava.rmi.server.hostname=localhost", "--queues", "JournalFlushInterruptTest", "--name", "interruptjf");
cliCreateServer.createServer();
}
}
private static final String JMX_SERVER_HOSTNAME = "localhost";
private static final int JMX_SERVER_PORT_0 = 1099;
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
static ObjectNameBuilder nameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "jfinterrupt", true);
Process serverProcess;
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
serverProcess = startServer(SERVER_NAME_0, 0, 30000);
disableCheckThread();
}
private void killProcess(Process process) throws Exception {
Runtime.getRuntime().exec("kill -9 " + process.pid());
}
@Test
public void testInterruptJF() throws Throwable {
final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
final String queueName = "JournalFlushInterruptTest";
final int messageCount = 100;
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(queueName));
for (int i = 0; i < messageCount; i++) {
producer.send(session.createTextMessage("MessageCount: " + i));
}
}
QueueControl queueControl = getQueueControl(liveURI, nameBuilder, queueName, queueName, RoutingType.ANYCAST, 5000);
Wait.assertEquals(messageCount, queueControl::getMessageCount, 5000);
Thread.sleep(100);
killProcess(serverProcess);
Assert.assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
serverProcess = startServer(SERVER_NAME_0, 0, 0);
waitForServerToStart("tcp://localhost:61616", "artemis", "artemis", 5000);
queueControl = getQueueControl(liveURI, nameBuilder, queueName, queueName, RoutingType.ANYCAST, 5000);
Wait.assertEquals(messageCount, queueControl::getMessageCount);
}
}