Skip to content
Permalink
Browse files
ARTEMIS-3729 Fix JMS CORE client commit after async sends
  • Loading branch information
brusdev authored and clebertsuconic committed Mar 21, 2022
1 parent 5bba1fb commit d0c550bcd781273df023c19490f31e21f12c2718
Show file tree
Hide file tree
Showing 13 changed files with 218 additions and 12 deletions.
@@ -61,6 +61,11 @@ default boolean isVersionSupportRouting() {
return version >= PacketImpl.ARTEMIS_2_18_0_VERSION;
}

default boolean isVersionSupportCommitV2() {
int version = getChannelVersion();
return version >= PacketImpl.ARTEMIS_2_21_0_VERSION;
}

/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types
@@ -85,6 +85,8 @@
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
@@ -443,13 +445,17 @@ public void forceDelivery(final ClientConsumer consumer, final long sequence) th

@Override
public void simpleCommit() throws ActiveMQException {
sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
simpleCommit(true);
}

@Override
public void simpleCommit(boolean block) throws ActiveMQException {
if (block) {
sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
if (!sessionChannel.getConnection().isVersionSupportCommitV2()) {
sessionChannel.sendBlocking(new SessionCommitMessage(), PacketImpl.NULL_RESPONSE);
} else {
sessionChannel.sendBlocking(new SessionCommitMessage_V2(), PacketImpl.NULL_RESPONSE);
}
} else {
sessionChannel.sendBatched(new PacketImpl(PacketImpl.SESS_COMMIT));
}
@@ -21,6 +21,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -132,6 +133,8 @@ protected static String idToString(long code) {

private final List<Interceptor> interceptors;

private final AtomicLong blockingCorrelationID = new AtomicLong(-1);

public ChannelImpl(final CoreRemotingConnection connection,
final long id,
final int confWindowSize,
@@ -481,6 +484,8 @@ public Packet sendBlocking(final Packet packet,
synchronized (sendBlockingLock) {
packet.setChannelID(id);

packet.setCorrelationID(blockingCorrelationID.decrementAndGet());

final ActiveMQBuffer buffer = packet.encode(connection);

lock.lock();
@@ -508,7 +513,7 @@ public Packet sendBlocking(final Packet packet,

long start = System.currentTimeMillis();

while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket)) && toWait > 0) {
while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && (response.getType() != expectedPacket || response.getCorrelationID() != packet.getCorrelationID()))) && toWait > 0) {
try {
sendCondition.await(toWait, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
@@ -61,6 +61,7 @@
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
@@ -252,7 +253,11 @@ public Packet decode(byte packetType, CoreRemotingConnection connection) {
break;
}
case SESS_COMMIT: {
packet = new SessionCommitMessage();
if (!connection.isVersionSupportCommitV2()) {
packet = new SessionCommitMessage();
} else {
packet = new SessionCommitMessage_V2();
}
break;
}
case SESS_ROLLBACK: {
@@ -41,6 +41,9 @@ public class PacketImpl implements Packet {
// 2.18.0
public static final int ARTEMIS_2_18_0_VERSION = 131;

// 2.21.0
public static final int ARTEMIS_2_21_0_VERSION = 132;

public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue.");
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
@@ -53,6 +53,8 @@ public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
correlationID = buffer.readLong();
} else {
correlationID = -1;
}
}

@@ -71,11 +73,6 @@ public final boolean isResponseAsync() {
return true;
}

@Override
protected String getPacketString() {
return super.getPacketString() + ", correlationID=" + correlationID;
}

@Override
public int hashCode() {
final int prime = 31;
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.utils.DataConstants;

public class SessionCommitMessage_V2 extends SessionCommitMessage {

private long correlationID;

@Override
public long getCorrelationID() {
return correlationID;
}

@Override
public void setCorrelationID(long correlationID) {
this.correlationID = correlationID;
}

@Override
public boolean isResponseAsync() {
return true;
}

@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeLong(correlationID);
}

@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
correlationID = buffer.readLong();
} else {
correlationID = -1;
}
}

@Override
public int expectedEncodeSize() {
return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
}

@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!super.equals(obj)) {
return false;
}
if (!(obj instanceof SessionCommitMessage_V2)) {
return false;
}
SessionCommitMessage_V2 other = (SessionCommitMessage_V2) obj;
if (correlationID != other.correlationID) {
return false;
}
return true;
}
}
@@ -64,6 +64,8 @@ public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
correlationID = buffer.readLong();
} else {
correlationID = -1;
}
}

@@ -48,6 +48,8 @@ public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
correlationID = buffer.readLong();
} else {
correlationID = -1;
}
}

@@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionTag=${activemq.version.versionTag}
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132
@@ -660,7 +660,7 @@ private RoutingType getRoutingTypeFromAddress(SimpleString address) {
}

private boolean requireNullResponseMessage_V1(Packet packet) {
return !packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange();
return channel.getConnection().isVersionBeforeAsyncResponseChange();
}

private NullResponseMessage createNullResponseMessage_V1(Packet packet) {
@@ -177,7 +177,7 @@
<activemq.version.majorVersion>1</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion>
<activemq.version.incrementingVersion>131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.incrementingVersion>132,131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>

@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.tests.integration.client;

import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Test;

import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class JMSTransactionTest extends JMSTestBase {

@Test(timeout = 60000)
public void testAsyncProduceMessageAndCommit() throws Throwable {
final String queueName = "TEST";
final int messages = 10;

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setConfirmationWindowSize(1000000);
cf.setBlockOnDurableSend(true);
cf.setBlockOnNonDurableSend(true);

CountDownLatch commitLatch = new CountDownLatch(1);
AtomicInteger sentMessages = new AtomicInteger(0);

server.getRemotingService().addIncomingInterceptor((Interceptor) (packet, connection1) -> {
if (packet.getType() == PacketImpl.SESS_COMMIT) {
commitLatch.countDown();
}
return true;
});

try (Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {

javax.jms.Queue queue = session.createQueue(queueName);
MessageProducer p = session.createProducer(queue);

for (int i = 0; i < messages; i++) {
TextMessage message = session.createTextMessage();
message.setText("Message:" + i);
p.send(message, new CompletionListener() {
@Override
public void onCompletion(Message message) {
try {
commitLatch.await();
sentMessages.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void onException(Message message, Exception exception) {

}
});
}

session.commit();
Assert.assertEquals(messages, sentMessages.get());

org.apache.activemq.artemis.core.server.Queue queueView = server.locateQueue(SimpleString.toSimpleString(queueName));
Wait.assertEquals(messages, queueView::getMessageCount);
}
}
}

0 comments on commit d0c550b

Please sign in to comment.