-
Notifications
You must be signed in to change notification settings - Fork 912
/
ConfigTest.java
423 lines (349 loc) · 18.4 KB
/
ConfigTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.config;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.sql.DataSource;
import java.io.File;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.util.List;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.TimedSubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.DefaultDatabaseLocker;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.adapter.TransactDatabaseLocker;
import org.apache.activemq.store.journal.JournalPersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ConfigTest {
protected static final String JOURNAL_ROOT = "target/test-data/";
protected static final String DERBY_ROOT = "target/test-data/";
protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/config/sample-conf/";
private static final Logger LOG = LoggerFactory.getLogger(ConfigTest.class);
static {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
}
/*
* IMPORTANT NOTE: Assertions checking for the existence of the derby
* directory will fail if the first derby directory is not created under
* target/test-data/. The test in unable to change the derby root directory
* for succeeding creation. It uses the first created directory as the root.
*/
/*
* This tests creating a journal persistence adapter using the persistence
* adapter factory bean
*/
@Test
public void testJournaledJDBCConfig() throws Exception {
File journalFile = new File(JOURNAL_ROOT + "testJournaledJDBCConfig/journal");
recursiveDelete(journalFile);
File derbyFile = new File(DERBY_ROOT + "testJournaledJDBCConfig/derbydb"); // Default
recursiveDelete(derbyFile);
BrokerService broker;
broker = createBroker(new FileSystemResource(CONF_ROOT + "journaledjdbc-example.xml"));
try {
assertEquals("Broker Config Error (brokerName)", "brokerJournaledJDBCConfigTest", broker.getBrokerName());
PersistenceAdapter adapter = broker.getPersistenceAdapter();
assertTrue("Should have created a journal persistence adapter", adapter instanceof JournalPersistenceAdapter);
assertTrue("Should have created a derby directory at " + derbyFile.getAbsolutePath(), derbyFile.exists());
assertTrue("Should have created a journal directory at " + journalFile.getAbsolutePath(), journalFile.exists());
// Check persistence factory configurations
broker.getPersistenceAdapter();
assertTrue(broker.getSystemUsage().getStoreUsage().getStore() instanceof JournalPersistenceAdapter);
LOG.info("Success");
} finally {
if (broker != null) {
broker.stop();
}
}
}
@Test
public void testJdbcLockConfigOverride() throws Exception {
JDBCPersistenceAdapter adapter = new JDBCPersistenceAdapter();
Mockery context = new Mockery();
final DataSource dataSource = context.mock(DataSource.class);
final Connection connection = context.mock(Connection.class);
final DatabaseMetaData metadata = context.mock(DatabaseMetaData.class);
final ResultSet result = context.mock(ResultSet.class);
adapter.setDataSource(dataSource);
adapter.setCreateTablesOnStartup(false);
context.checking(new Expectations() {{
allowing(dataSource).getConnection();
will(returnValue(connection));
allowing(connection).getMetaData();
will(returnValue(metadata));
allowing(connection);
allowing(metadata).getDriverName();
will(returnValue("Microsoft_SQL_Server_2005_jdbc_driver"));
allowing(result).next();
will(returnValue(true));
}});
adapter.start();
assertTrue("has the locker override", adapter.getLocker() instanceof TransactDatabaseLocker);
adapter.stop();
}
/*
* This tests configuring the different broker properties using
* xbeans-spring
*/
@Test
public void testBrokerConfig() throws Exception {
ActiveMQTopic dest;
BrokerService broker;
File journalFile = new File(JOURNAL_ROOT);
recursiveDelete(journalFile);
// Create broker from resource
// System.out.print("Creating broker... ");
broker = createBroker("org/apache/activemq/config/example.xml");
LOG.info("Success");
try {
// Check broker configuration
// System.out.print("Checking broker configurations... ");
assertEquals("Broker Config Error (brokerName)", "brokerConfigTest", broker.getBrokerName());
assertEquals("Broker Config Error (populateJMSXUserID)", false, broker.isPopulateJMSXUserID());
assertEquals("Broker Config Error (useLoggingForShutdownErrors)", true, broker.isUseLoggingForShutdownErrors());
assertEquals("Broker Config Error (useJmx)", true, broker.isUseJmx());
assertEquals("Broker Config Error (persistent)", false, broker.isPersistent());
assertEquals("Broker Config Error (useShutdownHook)", false, broker.isUseShutdownHook());
assertEquals("Broker Config Error (deleteAllMessagesOnStartup)", true, broker.isDeleteAllMessagesOnStartup());
LOG.info("Success");
// Check specific vm transport
// System.out.print("Checking vm connector... ");
assertEquals("Should have a specific VM Connector", "vm://javacoola", broker.getVmConnectorURI().toString());
LOG.info("Success");
// Check transport connectors list
// System.out.print("Checking transport connectors... ");
List<TransportConnector> connectors = broker.getTransportConnectors();
assertTrue("Should have created at least 3 connectors", connectors.size() >= 3);
assertTrue("1st connector should be TcpTransportServer", connectors.get(0).getServer() instanceof TcpTransportServer);
assertTrue("2nd connector should be TcpTransportServer", connectors.get(1).getServer() instanceof TcpTransportServer);
assertTrue("3rd connector should be TcpTransportServer", connectors.get(2).getServer() instanceof TcpTransportServer);
// Check network connectors
// System.out.print("Checking network connectors... ");
List<NetworkConnector> networkConnectors = broker.getNetworkConnectors();
assertEquals("Should have a single network connector", 1, networkConnectors.size());
LOG.info("Success");
// Check dispatch policy configuration
// System.out.print("Checking dispatch policies... ");
dest = new ActiveMQTopic("Topic.SimpleDispatch");
assertTrue("Should have a simple dispatch policy for " + dest.getTopicName(), broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof SimpleDispatchPolicy);
dest = new ActiveMQTopic("Topic.RoundRobinDispatch");
assertTrue("Should have a round robin dispatch policy for " + dest.getTopicName(), broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof RoundRobinDispatchPolicy);
dest = new ActiveMQTopic("Topic.StrictOrderDispatch");
assertTrue("Should have a strict order dispatch policy for " + dest.getTopicName(), broker.getDestinationPolicy().getEntryFor(dest).getDispatchPolicy() instanceof StrictOrderDispatchPolicy);
LOG.info("Success");
// Check subscription policy configuration
// System.out.print("Checking subscription recovery policies... ");
SubscriptionRecoveryPolicy subsPolicy;
dest = new ActiveMQTopic("Topic.FixedSizedSubs");
subsPolicy = broker.getDestinationPolicy().getEntryFor(dest).getSubscriptionRecoveryPolicy();
assertTrue("Should have a fixed sized subscription recovery policy for " + dest.getTopicName(), subsPolicy instanceof FixedSizedSubscriptionRecoveryPolicy);
assertEquals("FixedSizedSubsPolicy Config Error (maximumSize)", 2000000, ((FixedSizedSubscriptionRecoveryPolicy) subsPolicy).getMaximumSize());
assertEquals("FixedSizedSubsPolicy Config Error (useSharedBuffer)", false, ((FixedSizedSubscriptionRecoveryPolicy) subsPolicy).isUseSharedBuffer());
dest = new ActiveMQTopic("Topic.LastImageSubs");
subsPolicy = broker.getDestinationPolicy().getEntryFor(dest).getSubscriptionRecoveryPolicy();
assertTrue("Should have a last image subscription recovery policy for " + dest.getTopicName(), subsPolicy instanceof LastImageSubscriptionRecoveryPolicy);
dest = new ActiveMQTopic("Topic.NoSubs");
subsPolicy = broker.getDestinationPolicy().getEntryFor(dest).getSubscriptionRecoveryPolicy();
assertTrue("Should have no subscription recovery policy for " + dest.getTopicName(), subsPolicy instanceof NoSubscriptionRecoveryPolicy);
dest = new ActiveMQTopic("Topic.TimedSubs");
subsPolicy = broker.getDestinationPolicy().getEntryFor(dest).getSubscriptionRecoveryPolicy();
assertTrue("Should have a timed subscription recovery policy for " + dest.getTopicName(), subsPolicy instanceof TimedSubscriptionRecoveryPolicy);
assertEquals("TimedSubsPolicy Config Error (recoverDuration)", 25000, ((TimedSubscriptionRecoveryPolicy) subsPolicy).getRecoverDuration());
LOG.info("Success");
// Check usage manager
// System.out.print("Checking memory manager configurations... ");
SystemUsage systemUsage = broker.getSystemUsage();
assertTrue("Should have a SystemUsage", systemUsage != null);
assertEquals("SystemUsage Config Error (MemoryUsage.limit)", 1024 * 1024 * 10, systemUsage.getMemoryUsage().getLimit());
assertEquals("SystemUsage Config Error (MemoryUsage.percentUsageMinDelta)", 20, systemUsage.getMemoryUsage().getPercentUsageMinDelta());
assertEquals("SystemUsage Config Error (TempUsage.limit)", 1024 * 1024 * 100, systemUsage.getTempUsage().getLimit());
assertEquals("SystemUsage Config Error (StoreUsage.limit)", 1024 * 1024 * 1024, systemUsage.getStoreUsage().getLimit());
assertEquals("SystemUsage Config Error (StoreUsage.name)", "foo", systemUsage.getStoreUsage().getName());
assertNotNull(systemUsage.getStoreUsage().getStore());
assertTrue(systemUsage.getStoreUsage().getStore() instanceof MemoryPersistenceAdapter);
LOG.info("Success");
} finally {
if (broker != null) {
broker.stop();
}
}
}
/*
* This tests creating a journal persistence adapter using xbeans-spring
*/
@Test
public void testJournalConfig() throws Exception {
File journalFile = new File(JOURNAL_ROOT + "testJournalConfig/journal");
recursiveDelete(journalFile);
BrokerService broker;
broker = createBroker(new FileSystemResource(CONF_ROOT + "journal-example.xml"));
try {
assertEquals("Broker Config Error (brokerName)", "brokerJournalConfigTest", broker.getBrokerName());
PersistenceAdapter adapter = broker.getPersistenceAdapter();
assertTrue("Should have created a journal persistence adapter", adapter instanceof JournalPersistenceAdapter);
assertTrue("Should have created a journal directory at " + journalFile.getAbsolutePath(), journalFile.exists());
LOG.info("Success");
} finally {
if (broker != null) {
broker.stop();
}
}
}
/*
* This tests creating a memory persistence adapter using xbeans-spring
*/
@Test
public void testMemoryConfig() throws Exception {
File journalFile = new File(JOURNAL_ROOT + "testMemoryConfig");
recursiveDelete(journalFile);
File derbyFile = new File(DERBY_ROOT + "testMemoryConfig");
recursiveDelete(derbyFile);
BrokerService broker;
broker = createBroker(new FileSystemResource(CONF_ROOT + "memory-example.xml"));
try {
assertEquals("Broker Config Error (brokerName)", "brokerMemoryConfigTest", broker.getBrokerName());
PersistenceAdapter adapter = broker.getPersistenceAdapter();
assertTrue("Should have created a memory persistence adapter", adapter instanceof MemoryPersistenceAdapter);
assertTrue("Should have not created a derby directory at " + derbyFile.getAbsolutePath(), !derbyFile.exists());
assertTrue("Should have not created a journal directory at " + journalFile.getAbsolutePath(), !journalFile.exists());
LOG.info("Success");
} finally {
if (broker != null) {
broker.stop();
}
}
}
@Test
public void testConnectorConfig() throws Exception {
File journalFile = new File(JOURNAL_ROOT + "testMemoryConfig");
recursiveDelete(journalFile);
File derbyFile = new File(DERBY_ROOT + "testMemoryConfig");
recursiveDelete(derbyFile);
final int MAX_PRODUCERS = 5;
final int MAX_CONSUMERS = 10;
BrokerService broker = createBroker(new FileSystemResource(CONF_ROOT + "connector-properties.xml"));
broker.start();
try {
assertEquals(broker.getTransportConnectorByScheme("tcp").getMaximumProducersAllowedPerConnection(), MAX_PRODUCERS);
assertEquals(broker.getTransportConnectorByScheme("tcp").getMaximumConsumersAllowedPerConnection(), MAX_CONSUMERS);
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61631");
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.foo");
for (int i = 0; i < MAX_PRODUCERS; i++) {
session.createProducer(topic);
}
try {
session.createProducer(topic);
fail("Should have got an exception on exceeding MAX_PRODUCERS");
} catch (JMSException expected) {
}
try {
for (int i = 0; i < (MAX_CONSUMERS + 1); i++) {
MessageConsumer consumer = session.createConsumer(topic);
assertNotNull(consumer);
}
fail("Should have caught an exception");
} catch (JMSException e) {
}
LOG.info("Success");
} finally {
broker.stop();
}
}
@Test
public void testXmlConfigHelper() throws Exception {
BrokerService broker;
broker = createBroker(new FileSystemResource(CONF_ROOT + "memory-example.xml"));
try {
assertEquals("Broker Config Error (brokerName)", "brokerMemoryConfigTest", broker.getBrokerName());
} finally {
if (broker != null) {
broker.stop();
}
}
broker = createBroker("org/apache/activemq/config/config.xml");
try {
assertEquals("Broker Config Error (brokerName)", "brokerXmlConfigHelper", broker.getBrokerName());
} finally {
if (broker != null) {
broker.stop();
}
}
}
/*
* TODO: Create additional tests for forwarding bridges
*/
protected static void recursiveDelete(File file) {
if (file.isDirectory()) {
File[] files = file.listFiles();
for (int i = 0; i < files.length; i++) {
recursiveDelete(files[i]);
}
}
file.delete();
}
protected BrokerService createBroker(String resource) throws Exception {
return createBroker(new ClassPathResource(resource));
}
protected BrokerService createBroker(Resource resource) throws Exception {
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
BrokerService broker = factory.getBroker();
assertTrue("Should have a broker!", broker != null);
// Broker is already started by default when using the XML file
// broker.start();
return broker;
}
}