Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

新增多数据源事务传播机制 #406

Merged
merged 23 commits into from Dec 26, 2022
Merged

Conversation

zhaohaoh
Copy link
Contributor

@zhaohaoh zhaohaoh commented Nov 24, 2021

1.主要提取DynamicLocalTransactionInterceptor中的方法到TransactionalTemplate
TransactionalInfo transactionInfo = transactionalExecutor.getTransactionInfo();
if (!StringUtils.isEmpty(TransactionContext.getXID())) {
return transactionalExecutor.execute();
}
boolean state = true;
Object o;
String xid = LocalTxUtil.startTransaction();
try {
o = transactionalExecutor.execute();
} catch (Exception e) {
state = !isRollback(e, transactionInfo);
throw e;
} finally {
if (state) {
LocalTxUtil.commit(xid);
} else {
LocalTxUtil.rollback(xid);
}
}
return o;
原代理方法改为获取注解属性对象的封装
Method method = methodInvocation.getMethod();
final DSTransactional dsTransactional = method.getAnnotation(DSTransactional.class);

    TransactionalExecutor transactionalExecutor = new TransactionalExecutor() {
        @Override
        public Object execute() throws Throwable {
            return methodInvocation.proceed();
        }

        @Override
        public TransactionalInfo getTransactionInfo() {
            TransactionalInfo transactionInfo = new TransactionalInfo();
            transactionInfo.setPropagation(dsTransactional.propagation());
            transactionInfo.setNoRollbackFor(dsTransactional.noRollbackFor());
            transactionInfo.setRollbackFor(dsTransactional.rollbackFor());
            return transactionInfo;
        }
    }

2.AbstractRoutingDataSource获取数据源中获取事务的数据源的方法加入xid
ConnectionProxy connection = ConnectionFactory.getConnection(ds);
return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;
ConnectionProxy connection = ConnectionFactory.getConnection(xid, ds);
return connection == null ? getConnectionProxy(xid,ds, determineDataSource().getConnection()) : connection;
3,。增加事务异常类TransactionException

public class TransactionException extends RuntimeException {
public TransactionException(String message) {
super(message);
}

public TransactionException(String message, Throwable cause) {
    super(message, cause);
}

}
4.ConnectionFactory 对于事务数据源的处理通过xid保存 通过xid获取指定事务
public class ConnectionFactory {

private static final ThreadLocal<Map<String, ConnectionProxy>> CONNECTION_HOLDER =
        new ThreadLocal<Map<String, ConnectionProxy>>() {
private static final ThreadLocal<Map<String, Map<String, ConnectionProxy>>> CONNECTION_HOLDER =
        new ThreadLocal<Map<String, Map<String, ConnectionProxy>>>() {
            @Override
            protected Map<String, ConnectionProxy> initialValue() {
                return new ConcurrentHashMap<>(8);
            protected Map<String, Map<String, ConnectionProxy>> initialValue() {
                return new ConcurrentHashMap<>();
            }
        };

public static void putConnection(String ds, ConnectionProxy connection) {
    Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
    if (!concurrentHashMap.containsKey(ds)) {
public static void putConnection(String xid, String ds, ConnectionProxy connection) {
    Map<String, Map<String, ConnectionProxy>> concurrentHashMap = CONNECTION_HOLDER.get();
    Map<String, ConnectionProxy> connectionProxyMap = concurrentHashMap.get(xid);
    if (connectionProxyMap == null) {
        connectionProxyMap = new ConcurrentHashMap<>();
        concurrentHashMap.put(xid, connectionProxyMap);
    }
    if (!connectionProxyMap.containsKey(ds)) {
        try {
            connection.setAutoCommit(false);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        concurrentHashMap.put(ds, connection);
        connectionProxyMap.put(ds, connection);
    }
}

public static ConnectionProxy getConnection(String ds) {
    return CONNECTION_HOLDER.get().get(ds);
public static ConnectionProxy getConnection(String xid, String ds) {
    Map<String, Map<String, ConnectionProxy>> concurrentHashMap = CONNECTION_HOLDER.get();
    Map<String, ConnectionProxy> connectionProxyMap = concurrentHashMap.get(xid);
    if (CollectionUtils.isEmpty(connectionProxyMap)) {
        return null;
    }
    return connectionProxyMap.get(ds);
}

public static void notify(Boolean state) {
public static void notify(String xid, Boolean state) {
    Map<String, Map<String, ConnectionProxy>> concurrentHashMap = CONNECTION_HOLDER.get();
    try {
        Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
        for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {
            connectionProxy.notify(state);
        if (CollectionUtils.isEmpty(concurrentHashMap)) {
            return;
        }
        Map<String, ConnectionProxy> connectionProxyMap = concurrentHashMap.get(xid);
        for (ConnectionProxy connectionProxy : connectionProxyMap.values()) {
            if (connectionProxy != null) {
                connectionProxy.notify(state);
            }
        }
    } finally {
        CONNECTION_HOLDER.remove();
        concurrentHashMap.remove(xid);
    }
}

5.DsPropagation 事务传播机制枚举
6.src/main/java/com/baomidou/dynamic/datasource/tx/LocalTxUtil.java 事务工具提交关闭事务加入xid进行获取
public static void startTransaction() {
if (!StringUtils.isEmpty(TransactionContext.getXID())) {
log.debug("dynamic-datasource exist local tx [{}]", TransactionContext.getXID());
public static String startTransaction() {
String xid = TransactionContext.getXID();
if (!StringUtils.isEmpty(xid)) {
log.debug("dynamic-datasource exist local tx [{}]", xid);
} else {
String xid = UUID.randomUUID().toString();
xid = UUID.randomUUID().toString();
TransactionContext.bind(xid);
log.debug("dynamic-datasource start local tx [{}]", xid);
}
return xid;
}

/**
 * 手动提交事务
 */
public static void commit() {
    ConnectionFactory.notify(true);
public static void commit(String xid) {
    ConnectionFactory.notify(xid, true);
    log.debug("dynamic-datasource commit local tx [{}]", TransactionContext.getXID());
    TransactionContext.remove();
}

/**
 * 手动回滚事务
 */
public static void rollback() {
    ConnectionFactory.notify(false);
public static void rollback(String xid) {
    ConnectionFactory.notify(xid, false);
    log.debug("dynamic-datasource rollback local tx [{}]", TransactionContext.getXID());
    TransactionContext.remove();
}

7.事务传播机制使用的事务暂停xid对象 用于事务暂停和恢复xid
package com.baomidou.dynamic.datasource.tx;

import javax.annotation.Nonnull;

public class SuspendedResourcesHolder {
/**
* The xid
*/
private String xid;

public SuspendedResourcesHolder(String xid) {
    if (xid == null) {
        throw new IllegalArgumentException("xid must be not null");
    }
    this.xid = xid;
}

@Nonnull
public String getXid() {
    return xid;
}

}
8。TransactionalExecutor 事务执行器接口
9.TransactionalInfo 对事务注解中属性的封装
10.TransactionalTemplate 事务模板方法
对事务的操作抽取到模板方法中实现。后续可自定义扩展模板方法
处理异常和事务传播机制
package com.baomidou.dynamic.datasource.tx;

import com.baomidou.dynamic.datasource.exception.TransactionException;
import com.baomidou.mybatisplus.core.toolkit.ArrayUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;

import java.util.Objects;

@slf4j
public class TransactionalTemplate {

public Object execute(TransactionalExecutor transactionalExecutor) throws Throwable {
    TransactionalInfo transactionInfo = transactionalExecutor.getTransactionInfo();
    DsPropagation propagation = transactionInfo.propagation;
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction()) {
                    suspendedResourcesHolder = suspend();
                }
                return transactionalExecutor.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction()) {
                    suspendedResourcesHolder = suspend();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (!existingTransaction()) {
                    return transactionalExecutor.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // default
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction()) {
                    throw new TransactionException("Existing transaction found for transaction marked with propagation never");
                } else {
                    // Execute without transaction and return.
                    return transactionalExecutor.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (!existingTransaction()) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }
        return doExecute(transactionalExecutor);
    } finally {
        resume(suspendedResourcesHolder);
    }
}

private Object doExecute(TransactionalExecutor transactionalExecutor) throws Throwable {
    TransactionalInfo transactionInfo = transactionalExecutor.getTransactionInfo();
    if (!StringUtils.isEmpty(TransactionContext.getXID())) {
        return transactionalExecutor.execute();
    }
    boolean state = true;
    Object o;
    String xid = LocalTxUtil.startTransaction();
    try {
        o = transactionalExecutor.execute();
    } catch (Exception e) {
        state = !isRollback(e, transactionInfo);
        throw e;
    } finally {
        if (state) {
            LocalTxUtil.commit(xid);
        } else {
            LocalTxUtil.rollback(xid);
        }
    }
    return o;
}

private boolean isRollback(Throwable e, TransactionalInfo transactionInfo) {
    boolean isRollback = true;
    Class<? extends Throwable>[] rollbacks = transactionInfo.rollbackFor;
    Class<? extends Throwable>[] noRollbackFor = transactionInfo.noRollbackFor;
    if (ArrayUtils.isNotEmpty(noRollbackFor)) {
        for (Class<? extends Throwable> noRollBack : noRollbackFor) {
            int depth = getDepth(e.getClass(), noRollBack);
            if (depth >= 0) {
                return false;
            }
        }
    }
    if (ArrayUtils.isNotEmpty(rollbacks)) {
        for (Class<? extends Throwable> rollback : rollbacks) {
            int depth = getDepth(e.getClass(), rollback);
            if (depth >= 0) {
                return isRollback;
            }
        }
    }
    return false;
}

private int getDepth(Class<?> exceptionClass, Class<? extends Throwable> rollback) {
    if (rollback == Throwable.class || rollback == Exception.class) {
        return 0;
    }
    // If we've gone as far as we can go and haven't found it...
    if (exceptionClass == Throwable.class) {
        return -1;
    }
    if (Objects.equals(exceptionClass, rollback)) {
        return 0;
    }
    return getDepth(exceptionClass.getSuperclass(), rollback);
}

private void resume(SuspendedResourcesHolder suspendedResourcesHolder) {
    if (suspendedResourcesHolder != null) {
        String xid = suspendedResourcesHolder.getXid();
        TransactionContext.bind(xid);
    }
}

public SuspendedResourcesHolder suspend() {
    String xid = TransactionContext.getXID();
    if (xid != null) {
        if (log.isInfoEnabled()) {
            log.info("Suspending current transaction, xid = {}", xid);
        }
        TransactionContext.unbind(xid);
        return new SuspendedResourcesHolder(xid);
    } else {
        return null;
    }
}

public boolean existingTransaction() {
    return !StringUtils.isEmpty(TransactionContext.getXID());
}

}

@zhaohaoh zhaohaoh closed this Nov 24, 2021
@zhaohaoh zhaohaoh reopened this Nov 24, 2021
@zhaohaoh zhaohaoh closed this Nov 24, 2021
@zhaohaoh
Copy link
Contributor Author

多数据源事务传播机制

@huayanYu
Copy link
Member

在玩呢?

@zhaohaoh zhaohaoh reopened this Nov 24, 2021
@zhaohaoh
Copy link
Contributor Author

你好,有看么

@huayanYu
Copy link
Member

恩,改动比较大,暂时不会合并。空了会详细测试。先放这里

@zhaohaoh
Copy link
Contributor Author

好的,需要改动或兼容可以@

@huayanYu
Copy link
Member

有冲突

@zhaohaoh
Copy link
Contributor Author

之前是基于3.4改的,我看下3.5

# Conflicts:
#	src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalTemplate.java
@zhaohaoh
Copy link
Contributor Author

更新了下comment ,冲突主要是方法参数加入了xid,可以替换。还有个冲突是原代理类中的事务的方法抽取到了事务模板方法类中

@zhaohaoh
Copy link
Contributor Author

有考虑么,冲突可以先用我的版本试试

…ce-spring-boot-starter

� Conflicts:
�	src/main/java/com/baomidou/dynamic/datasource/ds/AbstractRoutingDataSource.java
�	src/main/java/com/baomidou/dynamic/datasource/tx/ConnectionFactory.java
�	src/main/java/com/baomidou/dynamic/datasource/tx/LocalTxUtil.java
@zhaohaoh
Copy link
Contributor Author

merge

@huayanYu huayanYu merged commit e2e176d into baomidou:master Dec 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants