Skip to content

Commit

Permalink
AG-227 - Make RecoveryXAResource reconnectable
Browse files Browse the repository at this point in the history
  • Loading branch information
graben authored and barreiro committed Feb 1, 2024
1 parent d538959 commit 342ee87
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,14 @@ public void removeResourceRecoveryFactory(ResourceRecoveryFactory factory) {
*/
interface ResourceRecoveryFactory {

/**
* The transaction layer can call this method to check if recovery is possible.
*/
boolean isRecoverable();

/**
* The transaction layer can call this method to obtain one connection used for recovery of incomplete transactions.
*/
XAConnection getRecoveryConnection();
XAConnection getRecoveryConnection() throws SQLException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.jboss.tm.XAResourceWrapper;

import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
Expand All @@ -13,21 +12,16 @@ public class ErrorConditionXAResource implements AutoCloseable, XAResourceWrappe
private static final String PRODUCT_NAME = ErrorConditionXAResource.class.getPackage().getImplementationTitle();
private static final String PRODUCT_VERSION = ErrorConditionXAResource.class.getPackage().getImplementationVersion();

private final XAConnection xaConnection;
private final SQLException error;
private final String jndiName;

public ErrorConditionXAResource(XAConnection xaConnection, SQLException error, String jndiName) {
this.xaConnection = xaConnection;
public ErrorConditionXAResource(SQLException error, String jndiName) {
this.error = error;
this.jndiName = jndiName;
}

@Override
public Xid[] recover(int flag) throws XAException {
if ( flag == TMENDRSCAN ) {
close();
}
throw XAExceptionUtils.xaException( XAException.XAER_RMFAIL, error );
}

Expand Down Expand Up @@ -80,11 +74,7 @@ public void start(Xid xid, int flags) throws XAException {

@Override
public void close() throws XAException {
try {
xaConnection.close();
} catch ( SQLException e ) {
throw XAExceptionUtils.xaException( XAException.XAER_RMFAIL, e );
}
// no-op
}

// --- //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.jboss.tm.XAResourceRecovery;
import org.jboss.tm.XAResourceRecoveryRegistry;

import javax.sql.XAConnection;
import javax.transaction.xa.XAResource;
import java.sql.SQLException;
import java.util.UUID;
Expand Down Expand Up @@ -229,11 +228,10 @@ private static class AgroalXAResourceRecovery implements XAResourceRecovery {
@Override
@SuppressWarnings( "resource" )
public XAResource[] getXAResources() {
XAConnection xaConnection = connectionFactory.getRecoveryConnection();
try {
return xaConnection == null ? EMPTY_RESOURCES : new XAResource[]{new RecoveryXAResource( xaConnection, name )};
return connectionFactory.isRecoverable() ? new XAResource[]{new RecoveryXAResource( connectionFactory, name )} : EMPTY_RESOURCES;
} catch ( SQLException e ) {
return new XAResource[]{new ErrorConditionXAResource( xaConnection, e, name )};
return new XAResource[]{new ErrorConditionXAResource( e, name )};
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.agroal.narayana;

import io.agroal.api.transaction.TransactionIntegration.ResourceRecoveryFactory;
import org.jboss.tm.XAResourceWrapper;

import javax.sql.XAConnection;
Expand All @@ -13,22 +14,36 @@ public class RecoveryXAResource implements AutoCloseable, XAResourceWrapper {
private static final String PRODUCT_NAME = RecoveryXAResource.class.getPackage().getImplementationTitle();
private static final String PRODUCT_VERSION = RecoveryXAResource.class.getPackage().getImplementationVersion();

private final XAResource wrappedResource;
private final ResourceRecoveryFactory connectionFactory;
private final String jndiName;
private XAConnection xaConnection;
private XAResource wrappedResource;

public RecoveryXAResource(XAConnection connection, String name) throws SQLException {
xaConnection = connection;
wrappedResource = connection.getXAResource();
public RecoveryXAResource(ResourceRecoveryFactory factory, String name) throws SQLException {
connectionFactory = factory;
jndiName = name;
connect();
}

private void connect() throws SQLException {
if ( wrappedResource == null ) {
xaConnection = connectionFactory.getRecoveryConnection();
wrappedResource = xaConnection.getXAResource();
}
}

private XAResource getConnectedResource() throws XAException {
try {
connect();
} catch ( SQLException e ) {
throw XAExceptionUtils.xaException( XAException.XAER_RMFAIL, e );
}
return wrappedResource;
}

@Override
public Xid[] recover(int flag) throws XAException {
if ( xaConnection == null ) {
throw XAExceptionUtils.xaException( XAException.XAER_RMFAIL );
}
Xid[] value = wrappedResource.recover( flag );
Xid[] value = getConnectedResource().recover( flag );
if ( flag == TMENDRSCAN && ( value == null || value.length == 0 ) ) {
close();
}
Expand All @@ -37,47 +52,47 @@ public Xid[] recover(int flag) throws XAException {

@Override
public void commit(Xid xid, boolean onePhase) throws XAException {
wrappedResource.commit( xid, onePhase );
getConnectedResource().commit( xid, onePhase );
}

@Override
public void end(Xid xid, int flags) throws XAException {
wrappedResource.end( xid, flags );
getConnectedResource().end( xid, flags );
}

@Override
public void forget(Xid xid) throws XAException {
wrappedResource.forget( xid );
getConnectedResource().forget( xid );
}

@Override
public int getTransactionTimeout() throws XAException {
return wrappedResource.getTransactionTimeout();
return getConnectedResource().getTransactionTimeout();
}

@Override
public boolean isSameRM(XAResource xares) throws XAException {
return wrappedResource.isSameRM( xares );
return getConnectedResource().isSameRM( xares );
}

@Override
public int prepare(Xid xid) throws XAException {
return wrappedResource.prepare( xid );
return getConnectedResource().prepare( xid );
}

@Override
public void rollback(Xid xid) throws XAException {
wrappedResource.rollback( xid );
getConnectedResource().rollback( xid );
}

@Override
public boolean setTransactionTimeout(int seconds) throws XAException {
return wrappedResource.setTransactionTimeout( seconds );
return getConnectedResource().setTransactionTimeout( seconds );
}

@Override
public void start(Xid xid, int flags) throws XAException {
wrappedResource.start( xid, flags );
getConnectedResource().start( xid, flags );
}

// --- //
Expand All @@ -90,14 +105,19 @@ public void close() throws XAException {
throw XAExceptionUtils.xaException( XAException.XAER_RMFAIL, e );
} finally {
xaConnection = null;
wrappedResource = null;
}
}

// --- //

@Override
public XAResource getResource() {
return wrappedResource;
try {
return getConnectedResource();
} catch (XAException e) {
throw new IllegalStateException(e);
}
}

@Override
Expand Down
27 changes: 16 additions & 11 deletions agroal-pool/src/main/java/io/agroal/pool/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.agroal.pool.util.XAConnectionAdaptor;

import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import java.lang.reflect.InvocationTargetException;
import java.security.Principal;
import java.sql.Connection;
Expand Down Expand Up @@ -269,18 +270,22 @@ private XAConnection xaConnectionSetup(XAConnection xaConnection) throws SQLExce
// --- //

@Override
@SuppressWarnings( "StringConcatenation" )
public XAConnection getRecoveryConnection() {
try {
if ( factoryMode == Mode.XA_DATASOURCE ) {
injectJdbcProperties( xaRecoveryDataSource, recoveryProperties() );
return xaRecoveryDataSource.getXAConnection();
}
fireOnWarning( listeners, "Recovery connections are only available for XADataSource" );
} catch ( SQLException e ) {
fireOnWarning( listeners, "Unable to get recovery connection: " + e.getMessage() );
public boolean isRecoverable() {
if ( factoryMode == Mode.XA_DATASOURCE ) {
return true;
}
fireOnWarning( listeners, "Recovery connections are only available for XADataSource" );
return false;
}

@Override
public XAConnection getRecoveryConnection() throws SQLException {
if ( isRecoverable() ) {
injectJdbcProperties(xaRecoveryDataSource, recoveryProperties());
return xaRecoveryDataSource.getXAConnection();
}
return null;
// Fallback for wrong implemented TransactionIntegration
throw new SQLException( "Recovery connections are only available for XADataSource" );
}

// --- //
Expand Down

0 comments on commit 342ee87

Please sign in to comment.