Skip to content

Commit

Permalink
simplify SingleXADataSource (#3535)
Browse files Browse the repository at this point in the history
* simplify XAShardingTransactionManagerTest

* create AtomikosDataSourceBean using DataSourceUtils

* decouple SingXADataSource from XADataSource interface

* SingleXADataSource => XATransactionDataSource

* decouple SingleXAConnection from XAConnection interface

* SingleXAConnection => XATransactionConnection

* do not support XADatasource construct

* refactor XATransactionDataSource

* for checkstyle
  • Loading branch information
cherrylzhao authored and terrymanu committed Nov 16, 2019
1 parent d84e84f commit bf68120
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 382 deletions.
Expand Up @@ -23,8 +23,8 @@
import org.apache.shardingsphere.transaction.core.ResourceDataSource;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.apache.shardingsphere.transaction.spi.ShardingTransactionManager;
import org.apache.shardingsphere.transaction.xa.jta.connection.SingleXAConnection;
import org.apache.shardingsphere.transaction.xa.jta.datasource.SingleXADataSource;
import org.apache.shardingsphere.transaction.xa.jta.connection.XATransactionConnection;
import org.apache.shardingsphere.transaction.xa.jta.datasource.XATransactionDataSource;
import org.apache.shardingsphere.transaction.xa.manager.XATransactionManagerLoader;
import org.apache.shardingsphere.transaction.xa.spi.XATransactionManager;

Expand All @@ -45,7 +45,7 @@
*/
public final class XAShardingTransactionManager implements ShardingTransactionManager {

private final Map<String, SingleXADataSource> singleXADataSourceMap = new HashMap<>();
private final Map<String, XATransactionDataSource> cachedDataSources = new HashMap<>();

private final XATransactionManager xaTransactionManager = XATransactionManagerLoader.getInstance().getTransactionManager();

Expand All @@ -63,9 +63,9 @@ public void init(final DatabaseType databaseType, final Collection<ResourceDataS
if (dataSource instanceof AtomikosDataSourceBean) {
continue;
}
SingleXADataSource singleXADataSource = new SingleXADataSource(databaseType, each.getUniqueResourceName(), dataSource);
singleXADataSourceMap.put(each.getOriginalName(), singleXADataSource);
xaTransactionManager.registerRecoveryResource(each.getUniqueResourceName(), singleXADataSource.getXaDataSource());
XATransactionDataSource xaTransactionDataSource = new XATransactionDataSource(databaseType, each.getUniqueResourceName(), dataSource);
cachedDataSources.put(each.getOriginalName(), xaTransactionDataSource);
xaTransactionManager.registerRecoveryResource(each.getUniqueResourceName(), xaTransactionDataSource.getXaDataSource());
}
xaTransactionManager.init();
}
Expand All @@ -83,12 +83,12 @@ public boolean isInTransaction() {

@Override
public Connection getConnection(final String dataSourceName) throws SQLException {
SingleXAConnection singleXAConnection = singleXADataSourceMap.get(dataSourceName).getXAConnection();
XATransactionConnection transactionConnection = cachedDataSources.get(dataSourceName).getConnection();
if (!enlistedXAResource.get().contains(dataSourceName)) {
xaTransactionManager.enlistResource(singleXAConnection.getXAResource());
xaTransactionManager.enlistResource(transactionConnection.getXAResource());
enlistedXAResource.get().add(dataSourceName);
}
return singleXAConnection.getConnection();
return transactionConnection.getTargetConnection();
}

@SneakyThrows
Expand Down Expand Up @@ -119,10 +119,10 @@ public void rollback() {

@Override
public void close() throws Exception {
for (SingleXADataSource each : singleXADataSourceMap.values()) {
for (XATransactionDataSource each : cachedDataSources.values()) {
xaTransactionManager.removeRecoveryResource(each.getResourceName(), each.getXaDataSource());
}
singleXADataSourceMap.clear();
cachedDataSources.clear();
xaTransactionManager.close();
enlistedXAResource = null;
}
Expand Down
Expand Up @@ -20,58 +20,40 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.transaction.xa.spi.SingleXAResource;

import javax.sql.ConnectionEventListener;
import javax.sql.StatementEventListener;
import javax.sql.XAConnection;
import java.sql.Connection;
import java.sql.SQLException;

/**
* Single XA Connection.
* XA transaction connection.
*
* @author zhaojun
*/
@RequiredArgsConstructor
public final class SingleXAConnection implements XAConnection {
public final class XATransactionConnection {

private final String resourceName;

private final Connection connection;

private final XAConnection xaConnection;

@Override
/**
* Get XA resource.
*
* @return single XA resource
* @throws SQLException SQL exception
*/
public SingleXAResource getXAResource() throws SQLException {
return new SingleXAResource(resourceName, xaConnection.getXAResource());
}

@Override
public Connection getConnection() {
/**
* Get target connection.
*
* @return target connection
*/
public Connection getTargetConnection() {
return connection;
}

@Override
public void close() throws SQLException {
xaConnection.close();
}

@Override
public void addConnectionEventListener(final ConnectionEventListener listener) {
xaConnection.addConnectionEventListener(listener);
}

@Override
public void removeConnectionEventListener(final ConnectionEventListener listener) {
xaConnection.removeConnectionEventListener(listener);
}

@Override
public void addStatementEventListener(final StatementEventListener listener) {
xaConnection.addStatementEventListener(listener);
}

@Override
public void removeStatementEventListener(final StatementEventListener listener) {
xaConnection.removeStatementEventListener(listener);
}
}

This file was deleted.

Expand Up @@ -19,8 +19,8 @@

import lombok.Getter;
import org.apache.shardingsphere.spi.database.DatabaseType;
import org.apache.shardingsphere.transaction.xa.jta.connection.SingleXAConnection;
import org.apache.shardingsphere.transaction.xa.jta.connection.XAConnectionFactory;
import org.apache.shardingsphere.transaction.xa.jta.connection.XATransactionConnection;

import javax.sql.DataSource;
import javax.sql.XAConnection;
Expand All @@ -29,11 +29,11 @@
import java.sql.SQLException;

/**
* Single XA data source.
* XA transaction data source.
*
* @author zhaojun
*/
public final class SingleXADataSource extends AbstractUnsupportedSingleXADataSource {
public final class XATransactionDataSource {

@Getter
private final String resourceName;
Expand All @@ -45,42 +45,22 @@ public final class SingleXADataSource extends AbstractUnsupportedSingleXADataSou

private final DataSource originalDataSource;

private final boolean isOriginalXADataSource;

public SingleXADataSource(final DatabaseType databaseType, final String resourceName, final XADataSource xaDataSource) {
this.databaseType = databaseType;
this.resourceName = resourceName;
this.xaDataSource = xaDataSource;
this.originalDataSource = null;
this.isOriginalXADataSource = true;
}

public SingleXADataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource) {
public XATransactionDataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource) {
this.databaseType = databaseType;
this.resourceName = resourceName;
originalDataSource = dataSource;
if (dataSource instanceof XADataSource) {
xaDataSource = (XADataSource) dataSource;
isOriginalXADataSource = true;
} else {
xaDataSource = XADataSourceFactory.build(databaseType, dataSource);
isOriginalXADataSource = false;
}
}

@Override
public SingleXAConnection getXAConnection() throws SQLException {
return isOriginalXADataSource ? getXAConnectionFromXADataSource() : getXAConnectionFromNoneXADataSource();
}

private SingleXAConnection getXAConnectionFromXADataSource() throws SQLException {
XAConnection xaConnection = xaDataSource.getXAConnection();
return new SingleXAConnection(resourceName, xaConnection.getConnection(), xaConnection);
xaDataSource = XADataSourceFactory.build(databaseType, dataSource);
}

private SingleXAConnection getXAConnectionFromNoneXADataSource() throws SQLException {
/**
* Get XA connection.
*
* @return XA transaction connection
* @throws SQLException SQL exception
*/
public XATransactionConnection getConnection() throws SQLException {
Connection originalConnection = originalDataSource.getConnection();
XAConnection xaConnection = XAConnectionFactory.createXAConnection(databaseType, xaDataSource, originalConnection);
return new SingleXAConnection(resourceName, originalConnection, xaConnection);
return new XATransactionConnection(resourceName, originalConnection, xaConnection);
}
}

0 comments on commit bf68120

Please sign in to comment.