/
TransactionDurabilityTest.java
173 lines (110 loc) · 5.22 KB
/
TransactionDurabilityTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
public class TransactionDurabilityTest extends ActiveMQTestBase {
/*
* This tests the following situation:
*
* (With the old implementation)
* Currently when a new persistent message is routed to persistent queues, the message is first stored, then the message is routed.
* Let's say it has been routed to two different queues A, B.
* Ref R1 gets consumed and acknowledged by transacted session S1, this decrements the ref count and causes an acknowledge record to be written to storage,
* transactionally, but it's not committed yet.
* Ref R2 then gets consumed and acknowledged by non transacted session S2, this causes a delete record to be written to storage.
* R1 then rolls back, and the server is restarted - unfortunately since the delete record was written R1 is not ready to be consumed again.
*
* It's therefore crucial the messages aren't deleted from storage until AFTER any ack records are committed to storage.
*
*
*/
@Test
public void testRolledBackAcknowledgeWithSameMessageAckedByOtherSession() throws Exception {
final SimpleString testAddress = new SimpleString("testAddress");
final SimpleString queue1 = new SimpleString("queue1");
final SimpleString queue2 = new SimpleString("queue2");
ActiveMQServer server = createServer(true, createDefaultInVMConfig());
server.start();
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session1 = addClientSession(sf.createSession(false, true, true));
ClientSession session2 = addClientSession(sf.createSession(false, false, false));
session1.createQueue(new QueueConfiguration(queue1).setAddress(testAddress));
session1.createQueue(new QueueConfiguration(queue2).setAddress(testAddress));
ClientProducer producer = session1.createProducer(testAddress);
ClientMessage message = session1.createMessage(true);
producer.send(message);
session1.start();
session2.start();
ClientConsumer consumer1 = session1.createConsumer(queue1);
ClientConsumer consumer2 = session2.createConsumer(queue2);
ClientMessage m1 = consumer1.receive(1000);
Assert.assertNotNull(m1);
ClientMessage m2 = consumer2.receive(1000);
Assert.assertNotNull(m2);
m2.acknowledge();
// Don't commit session 2
m1.acknowledge();
session2.rollback();
session1.close();
session2.close();
server.stop();
server.start();
sf = createSessionFactory(locator);
session1 = addClientSession(sf.createSession(false, true, true));
session2 = addClientSession(sf.createSession(false, true, true));
session1.start();
session2.start();
consumer1 = session1.createConsumer(queue1);
consumer2 = session2.createConsumer(queue2);
m1 = consumer1.receiveImmediate();
Assert.assertNull(m1);
m2 = consumer2.receive(1000);
Assert.assertNotNull(m2);
m2.acknowledge();
session1.close();
session2.close();
server.stop();
server.start();
sf = createSessionFactory(locator);
session1 = addClientSession(sf.createSession(false, true, true));
session2 = addClientSession(sf.createSession(false, true, true));
session1.start();
session2.start();
consumer1 = session1.createConsumer(queue1);
consumer2 = session2.createConsumer(queue2);
m1 = consumer1.receiveImmediate();
Assert.assertNull(m1);
m2 = consumer2.receiveImmediate();
Assert.assertNull(m2);
session1.close();
session2.close();
locator.close();
server.stop();
}
}