Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: distinguish database behavior according to the branch type #2415

Merged
merged 39 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
92a1e4b
optimize: optimize transaction context behavior when transaction mode…
booogu Feb 26, 2020
7829ce7
adjust BranchType switching with Propagation feature
booogu Feb 29, 2020
f4e4078
Merge remote-tracking branch 'upstream/develop' into f_tcc
booogu Mar 2, 2020
ef1ea3c
format code style
booogu Mar 2, 2020
65312dd
Merge branch 'develop' into f_tcc
zjinlei Mar 5, 2020
d3f1483
Merge branch 'develop' into f_tcc
zjinlei Mar 6, 2020
9bbdd1f
fix the mistake in sagaBranch and rename defaultBranchType
booogu Mar 7, 2020
8fb6dc2
handle required branchType
booogu Mar 7, 2020
0596108
Merge branch 'develop' into f_tcc
zjinlei Mar 7, 2020
52335c8
Merge branch 'develop' into f_tcc
zjinlei Mar 10, 2020
57e7dfd
Merge branch 'develop' into f_tcc
zjinlei Mar 11, 2020
06e79ea
use StringUtils
booogu Mar 12, 2020
d0b3128
Merge remote-tracking branch 'upstream/develop' into f_tcc
booogu Mar 17, 2020
e721fb1
optimize: optimize interceptorType to branchType
booogu Mar 17, 2020
f6a1cb6
remove unuse
booogu Mar 17, 2020
03d9a95
optimize rootcontext
booogu Mar 18, 2020
a3837de
optimize branchType to tccScope
booogu Mar 19, 2020
6030169
Merge remote-tracking branch 'upstream/develop' into f_tcc
booogu Mar 19, 2020
fd12c62
optimize branchType and tccScope
booogu Mar 19, 2020
c50587f
format some incorrect code
booogu Mar 19, 2020
97510c0
format code style
booogu Mar 19, 2020
657c45e
format code style
booogu Mar 19, 2020
3d9a032
Merge branch 'develop' into f_tcc
zjinlei Mar 19, 2020
5afdfc2
Merge branch 'develop' into f_tcc
slievrly Mar 23, 2020
d5dd5cd
Merge remote-tracking branch 'upstream/develop' into f_tcc
booogu Mar 25, 2020
3a0613b
merge latest code
booogu Mar 25, 2020
4785c86
add SAGA in requireUndoFunction judgement conditions in
booogu Apr 21, 2020
640477d
Merge branch 'develop' into f_tcc
zjinlei Apr 21, 2020
bd8b7eb
fix style
zjinlei Apr 21, 2020
9979b00
format code and keep code same in integration module
booogu Apr 22, 2020
95ddbe2
Merge branch 'f_tcc' of https://github.com/CharmingRabbit/seata into …
booogu Apr 22, 2020
7666bc1
Merge branch 'develop' into f_tcc
zjinlei Apr 24, 2020
127b3e8
optimize log output and func name
booogu Apr 24, 2020
d7d249b
Merge branch 'f_tcc' of https://github.com/CharmingRabbit/seata into …
booogu Apr 24, 2020
69f29b7
restore
zjinlei Apr 24, 2020
7e86dd3
Merge branch 'develop' into f_tcc
zjinlei Apr 26, 2020
f5c438e
optimize func name and log output
booogu Apr 26, 2020
7d41df6
Merge branch 'f_tcc' of https://github.com/CharmingRabbit/seata into …
booogu Apr 26, 2020
970e86f
remove travis -p param
booogu Apr 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 45 additions & 60 deletions core/src/main/java/io/seata/core/context/RootContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
*/
package io.seata.core.context;

import java.util.Map;

import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.util.StringUtils;
import io.seata.core.model.BranchType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* The type Root context.
Expand All @@ -41,7 +42,10 @@ private RootContext() {
*/
public static final String KEY_XID = "TX_XID";

public static final String KEY_XID_INTERCEPTOR_TYPE = "tx-xid-interceptor-type";
/**
* The constant KEY_BRANCH_TYPE
*/
public static final String KEY_BRANCH_TYPE = "TX_BRANCH_TYPE";

public static final String KEY_GLOBAL_LOCK_FLAG = "TX_LOCK";

Expand All @@ -57,24 +61,9 @@ public static String getXID() {
if (StringUtils.isNotBlank(xid)) {
return xid;
}

String xidType = CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR_TYPE);
if (StringUtils.isNotBlank(xidType) && xidType.contains("_")) {
return xidType.split("_")[0];
}

return null;
}

/**
* Gets xid.
*
* @return the xid
*/
public static String getXIDInterceptorType() {
return CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR_TYPE);
}

/**
* Bind.
*
Expand All @@ -87,36 +76,6 @@ public static void bind(String xid) {
CONTEXT_HOLDER.put(KEY_XID, xid);
}

/**
* Bind interceptor type
*
* @param xidType
*/
public static void bindInterceptorType(String xidType) {
if (StringUtils.isNotBlank(xidType)) {

String[] xidTypes = xidType.split("_");

if (xidTypes.length == 2) {
bindInterceptorType(xidTypes[0], BranchType.valueOf(xidTypes[1]));
}
}
}

/**
* Bind interceptor type
*
* @param xid
* @param branchType
*/
public static void bindInterceptorType(String xid, BranchType branchType) {
String xidType = String.format("%s_%s", xid, branchType.name());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind interceptor type xid={} branchType={}", xid, branchType);
}
CONTEXT_HOLDER.put(KEY_XID_INTERCEPTOR_TYPE, xidType);
}

/**
* declare local transactions will use global lock check for update/delete/insert/selectForUpdate SQL
*/
Expand All @@ -143,19 +102,6 @@ public static String unbind() {
return xid;
}

/**
* Unbind temporary string
*
* @return the string
*/
public static String unbindInterceptorType() {
String xidType = CONTEXT_HOLDER.remove(KEY_XID_INTERCEPTOR_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind inteceptor type {}", xidType);
}
return xidType;
}

public static void unbindGlobalLockFlag() {
String lockFlag = CONTEXT_HOLDER.remove(KEY_GLOBAL_LOCK_FLAG);
if (LOGGER.isDebugEnabled() && lockFlag != null) {
Expand All @@ -172,6 +118,45 @@ public static boolean inGlobalTransaction() {
return CONTEXT_HOLDER.get(KEY_XID) != null;
}

/**
* get the branch type
*
* @return the branch type String
*/
public static String getBranchType() {
String branchType = CONTEXT_HOLDER.get(KEY_BRANCH_TYPE);
if (StringUtils.isNotBlank(branchType)) {
return branchType;
}
return null;
}

/**
* bind branch type
*
* @param branchType the branch type
*/
public static void bindBranchType(BranchType branchType) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind branch type {}", branchType);
}

CONTEXT_HOLDER.put(KEY_BRANCH_TYPE, branchType.name());
}

/**
* unbind branch type
*
* @return the previous branch type string
*/
public static String unbindBranchType() {
String unbindBranchType = CONTEXT_HOLDER.remove(KEY_BRANCH_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind branch type {}", unbindBranchType);
}
return unbindBranchType;
}

/**
* requires global lock check
*
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/java/io/seata/core/context/RootContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.seata.common.exception.ShouldNeverHappenException;

import io.seata.core.model.BranchType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -31,6 +32,8 @@ public class RootContextTest {

private final String DEFAULT_XID = "default_xid";

private final BranchType DEFAULT_BRANCH_TYPE = BranchType.AT;

/**
* Test bind and unbind.
*/
Expand Down Expand Up @@ -94,4 +97,15 @@ public void testAssertNotInGlobalTransaction() {
assertThat(RootContext.getXID()).isNull();
}

@Test
public void testBindBranchType_And_UnbindBranchType(){
assertThat(RootContext.getBranchType()).isNull();
assertThat(RootContext.unbindBranchType()).isNull();
RootContext.bindBranchType(DEFAULT_BRANCH_TYPE);
assertThat(RootContext.unbindBranchType()).isEqualTo(DEFAULT_BRANCH_TYPE.name());
RootContext.unbindBranchType();
assertThat(RootContext.getBranchType()).isNull();
assertThat(RootContext.unbindBranchType()).isNull();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import io.seata.common.util.StringUtils;
import io.seata.core.context.RootContext;
import io.seata.core.constants.DubboConstants;
import io.seata.core.model.BranchType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,45 +45,51 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
return invoker.invoke(invocation);
}
String xid = RootContext.getXID();
String xidInterceptorType = RootContext.getXIDInterceptorType();
String branchType = RootContext.getBranchType();

String rpcXid = getRpcXid();
String rpcXidInterceptorType = RpcContext.getContext().getAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE);
String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
}
boolean bind = false;
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE, xidInterceptorType);
RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to separate and judge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not be processed this time, and transaction context object encapsulation will be provided later to handle log output uniformly.

} else {
if (rpcXid != null) {
RootContext.bind(rpcXid);
RootContext.bindInterceptorType(rpcXidInterceptorType);
if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saga branch never set ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have discussed with the author of saga. This pr is only responsible for the branchType judgment logic in ExecuteTemplate and the TCC branchType passing in Filter. The specific Saga branchtype assignment and other logic will be provided by the feature saga related pr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this #2551, autor is wangliang1986

}
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] interceptorType[{}] to RootContext", rpcXid, rpcXidInterceptorType);
LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType);
}
}
}
try {
return invoker.invoke(invocation);

} finally {
if (bind) {
String unbindInterceptorType = RootContext.unbindInterceptorType();
String unbindXid = RootContext.unbind();
String previousBranchType = RootContext.getBranchType();
if (StringUtils.equals(BranchType.TCC.name(), previousBranchType)) {
RootContext.unbindBranchType();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[{}] interceptorType[{}] from RootContext", unbindXid, unbindInterceptorType);
LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BranchType print null in AT mode is not very elegant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about judge in the log output temporarily,and print "AT" if it is null

}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {}, xidInterceptorType from {} to {} ", rpcXid,
unbindXid, rpcXidInterceptorType, unbindInterceptorType);
LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid,
rpcBranchType, previousBranchType);
if (unbindXid != null) {
RootContext.bind(unbindXid);
RootContext.bindInterceptorType(unbindInterceptorType);
LOGGER.warn("bind [{}] interceptorType[{}] back to RootContext", unbindXid,
unbindInterceptorType);
LOGGER.warn("bind xid [{}] back to RootContext", unbindXid);
if (StringUtils.equals(BranchType.TCC.name(), previousBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package io.seata.integration.dubbo;

import io.seata.common.util.StringUtils;
import io.seata.core.context.RootContext;
import io.seata.core.model.BranchType;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
Expand All @@ -40,42 +42,51 @@ public class ApacheDubboTransactionPropagationFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID();
String xidInterceptorType = RootContext.getXIDInterceptorType();
String branchType = RootContext.getBranchType();

String rpcXid = getRpcXid();
String rpcXidInterceptorType = RpcContext.getContext().getAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE);
String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
}
boolean bind = false;
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE, xidInterceptorType);
RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType);
} else {
if (rpcXid != null) {
RootContext.bind(rpcXid);
RootContext.bindInterceptorType(rpcXidInterceptorType);
if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
}
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] interceptorType[{}] to RootContext", rpcXid, rpcXidInterceptorType);
LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType);
}
}
}
try {
return invoker.invoke(invocation);
} finally {
if (bind) {
String unbindInterceptorType = RootContext.unbindInterceptorType();
String unbindXid = RootContext.unbind();
String previousBranchType = RootContext.getBranchType();
if (StringUtils.equals(BranchType.TCC.name(), previousBranchType)) {
RootContext.unbindBranchType();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[{}] interceptorType[{}] from RootContext", unbindXid, unbindInterceptorType);
LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType);
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {}, xidInterceptorType from {} to {} ", rpcXid, unbindXid, rpcXidInterceptorType, unbindInterceptorType);
LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid,
rpcBranchType, previousBranchType);
if (unbindXid != null) {
RootContext.bind(unbindXid);
RootContext.bindInterceptorType(unbindInterceptorType);
LOGGER.warn("bind [{}] interceptorType[{}] back to RootContext", unbindXid, unbindInterceptorType);
LOGGER.warn("bind xid [{}] back to RootContext", unbindXid);
if (StringUtils.equals(BranchType.TCC.name(), previousBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package io.seata.rm.datasource.exec;

import io.seata.common.util.StringUtils;
import io.seata.common.util.CollectionUtils;
import io.seata.core.context.RootContext;
import io.seata.core.model.BranchType;
import io.seata.rm.datasource.StatementProxy;
import io.seata.rm.datasource.sql.SQLVisitorFactory;
import io.seata.sqlparser.SQLRecognizer;
Expand Down Expand Up @@ -54,7 +56,7 @@ public static <T, S extends Statement> T execute(StatementProxy<S> statementProx
*
* @param <T> the type parameter
* @param <S> the type parameter
* @param sqlRecognizer the sql recognizer
* @param sqlRecognizers the sql recognizer list
* @param statementProxy the statement proxy
* @param statementCallback the statement callback
* @param args the args
Expand All @@ -66,7 +68,7 @@ public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecogniz
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {

if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
if (!requiresLockOrUndoFunction()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
Expand Down Expand Up @@ -115,4 +117,17 @@ public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecogniz
}
return rs;
}

private static boolean requiresLockOrUndoFunction() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other more meaningful method name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change it to "shouldExecuteInATMode"

if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
return false;
}
if (RootContext.inGlobalTransaction()) {
String branchType = RootContext.getBranchType();
if (StringUtils.equals(BranchType.TCC.name(), branchType) || StringUtils.equals(BranchType.SAGA.name(), branchType)) {
return false;
}
}
return true;
}
}
Loading