Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-2559 Connection failure should rollback pending XA TX #2899

Merged
merged 1 commit into from Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -139,6 +139,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {

protected Transaction tx;

/** This will store the Transaction between xaEnd and xaPrepare or xaCommit.
* in a failure scenario (client is gone), this will be held between xaEnd and xaCommit. */
protected volatile Transaction pendingTX;

protected boolean xa;

protected final PagingManager pagingManager;
Expand Down Expand Up @@ -384,13 +388,28 @@ protected void doClose(final boolean failed) throws Exception {
if (closed)
return;

if (tx != null && tx.getXid() == null) {
// We only rollback local txs on close, not XA tx branches
if (failed) {

try {
rollback(failed, false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToRollbackOnClose(e);
Transaction txToRollback = tx;
if (txToRollback != null) {
txToRollback.rollbackIfPossible();
}

txToRollback = pendingTX;

if (txToRollback != null) {
txToRollback.rollbackIfPossible();
}

} else {
if (tx != null && tx.getXid() == null) {
// We only rollback local txs on close, not XA tx branches

try {
rollback(failed, false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToRollbackOnClose(e);
}
}
}
}
Expand Down Expand Up @@ -1252,6 +1271,7 @@ private Transaction newTransaction(final Xid xid) {

@Override
public synchronized void xaCommit(final Xid xid, final boolean onePhase) throws Exception {
this.pendingTX = null;

if (tx != null && tx.getXid().equals(xid)) {
final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
Expand Down Expand Up @@ -1310,6 +1330,7 @@ public synchronized void xaEnd(final Xid xid) throws Exception {
throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
}
} else {
this.pendingTX = tx;
tx = null;
}
} else {
Expand Down Expand Up @@ -1395,6 +1416,8 @@ public synchronized void xaResume(final Xid xid) throws Exception {

@Override
public synchronized void xaRollback(final Xid xid) throws Exception {
this.pendingTX = null;

if (tx != null && tx.getXid().equals(xid)) {
final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();

Expand Down
Expand Up @@ -51,6 +51,11 @@ enum State {

void rollback() throws Exception;

/** In a ServerSession failure scenario,\
* we may try to rollback, however only if it's not prepared.
* In case it's prepared, we will just let it be and let the transaction manager to deal with it */
void rollbackIfPossible();

long getID();

Xid getXid();
Expand Down
Expand Up @@ -357,6 +357,27 @@ protected void doCommit() throws Exception {
state = State.COMMITTED;
}

@Override
public void rollbackIfPossible() {
synchronized (timeoutLock) {
if (state == State.ROLLEDBACK) {
// I don't think this could happen, but just in case
logger.debug("TransactionImpl::rollbackIfPossible::" + this + " is being ignored");
return;
}
if (state != State.PREPARED) {
try {
internalRollback(sorted);
} catch (Exception e) {
// nothing we can do beyond logging
// no need to special handler here as this was not even supposed to happen at this point
// even if it happenes this would be the exception of the exception, so we just log here
logger.warn(e.getMessage(), e);
}
}
}
}

@Override
public void rollback() throws Exception {
if (logger.isTraceEnabled()) {
Expand Down
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.xa;

import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class SessionFailureXATest extends ActiveMQTestBase {

private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER;

private final Map<String, AddressSettings> addressSettings = new HashMap<>();

private ActiveMQServer messagingService;

private ClientSession clientSession;

private ClientSessionFactory sessionFactory;

private Configuration configuration;

private final SimpleString atestq = new SimpleString("BasicXaTestq");

private ServerLocator locator;

private StoreConfiguration.StoreType storeType;

public SessionFailureXATest(StoreConfiguration.StoreType storeType) {
this.storeType = storeType;
}

@Parameterized.Parameters(name = "storeType={0}")
public static Collection<Object[]> data() {
Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}};
return Arrays.asList(params);
}

@Override
@Before
public void setUp() throws Exception {
super.setUp();

addressSettings.clear();

if (storeType == StoreConfiguration.StoreType.DATABASE) {
configuration = createDefaultJDBCConfig(true);
} else {
configuration = createDefaultNettyConfig();
}

messagingService = createServer(true, configuration, -1, -1, addressSettings);

// start the server
messagingService.start();

locator = createInVMNonHALocator();
locator.setAckBatchSize(0);
sessionFactory = createSessionFactory(locator);

clientSession = addClientSession(sessionFactory.createSession(true, false, false));

clientSession.createQueue(atestq, atestq, null, true);
}

@Test
public void testFailureWithXAEnd() throws Exception {
testFailure(true);
}

@Test
public void testFailureWithoutXAEnd() throws Exception {
testFailure(false);
}

public void testFailure(boolean xaEnd) throws Exception {

ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
try {
ClientProducer clientProducer = clientSession2.createProducer(atestq);
ClientMessage m1 = createTextMessage(clientSession2, "m1");
ClientMessage m2 = createTextMessage(clientSession2, "m2");
ClientMessage m3 = createTextMessage(clientSession2, "m3");
ClientMessage m4 = createTextMessage(clientSession2, "m4");
clientProducer.send(m1);
clientProducer.send(m2);
clientProducer.send(m3);
clientProducer.send(m4);
} finally {
clientSession2.close();
}

Xid xid = newXID();
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10));
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
ClientMessage m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
if (xaEnd) {
// We are validating both cases, where xaEnd succeeded and didn't succeed
// so this tests is parameterized to validate both cases.
clientSession.end(xid, XAResource.TMSUCCESS);
}

Wait.assertEquals(1, () -> messagingService.getSessions().size());

for (ServerSession serverSession : messagingService.getSessions()) {
serverSession.getRemotingConnection().fail(new ActiveMQException("fail this"));
serverSession.getRemotingConnection().disconnect(false);
}

Wait.assertEquals(0, () -> messagingService.getSessions().size());

locator = createInVMNonHALocator();
sessionFactory = createSessionFactory(locator);
clientSession = addClientSession(sessionFactory.createSession(true, false, false));

Wait.assertEquals(1, () -> messagingService.getSessions().size());

xid = newXID();

clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10));
clientSession.start();
clientConsumer = clientSession.createConsumer(atestq);
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");

}
}
Expand Up @@ -139,6 +139,11 @@ public void commit() throws Exception {

}

@Override
public void rollbackIfPossible() {

}

@Override
public void commit(final boolean onePhase) throws Exception {

Expand Down