Skip to content

Commit

Permalink
bugfix: fix server encode request error (#2456)
Browse files Browse the repository at this point in the history
  • Loading branch information
slievrly committed Apr 8, 2020
1 parent c5f928f commit 22c7862
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.nio.ByteBuffer;

import io.netty.buffer.ByteBuf;
import io.seata.common.util.StringUtils;
import io.seata.core.protocol.AbstractResultMessage;
import io.seata.core.protocol.ResultCode;

Expand All @@ -28,6 +29,8 @@
*/
public abstract class AbstractResultMessageCodec extends AbstractMessageCodec {

private static final int MAX_ERR_MSG_LEN = 128;

@Override
public Class<?> getMessageClassType() {
return AbstractResultMessage.class;
Expand All @@ -41,18 +44,14 @@ public <T> void encode(T t, ByteBuf out) {

out.writeByte(resultCode.ordinal());
if (resultCode == ResultCode.Failed) {
if (resultMsg != null) {
if (StringUtils.isNotEmpty(resultMsg)) {
String msg;
if (resultMsg.length() > 128) {
msg = resultMsg.substring(0, 128);
if (resultMsg.length() > MAX_ERR_MSG_LEN) {
msg = resultMsg.substring(0, MAX_ERR_MSG_LEN);
} else {
msg = resultMsg;
}
byte[] bs = msg.getBytes(UTF8);
if (bs.length > 400 && resultMsg.length() > 64) {
msg = resultMsg.substring(0, 64);
bs = msg.getBytes(UTF8);
}
out.writeShort((short)bs.length);
if (bs.length > 0) {
out.writeBytes(bs);
Expand Down
63 changes: 51 additions & 12 deletions server/src/main/java/io/seata/server/AbstractTCInboundHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.seata.core.exception.TransactionException;
import io.seata.core.exception.TransactionExceptionCode;
import io.seata.core.model.GlobalStatus;
import io.seata.core.protocol.transaction.AbstractGlobalEndRequest;
import io.seata.core.protocol.transaction.AbstractGlobalEndResponse;
import io.seata.core.protocol.transaction.BranchRegisterRequest;
import io.seata.core.protocol.transaction.BranchRegisterResponse;
import io.seata.core.protocol.transaction.BranchReportRequest;
Expand All @@ -40,6 +42,8 @@
import io.seata.core.rpc.RpcContext;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The type Abstract tc inbound handler.
Expand All @@ -48,6 +52,8 @@
*/
public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTCInboundHandler.class);

@Override
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
GlobalBeginResponse response = new GlobalBeginResponse();
Expand Down Expand Up @@ -80,6 +86,7 @@ protected abstract void doGlobalBegin(GlobalBeginRequest request, GlobalBeginRes
@Override
public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
GlobalCommitResponse response = new GlobalCommitResponse();
response.setGlobalStatus(GlobalStatus.Committing);
exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
@Override
public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
Expand All @@ -92,6 +99,20 @@ public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
e);
}
}
@Override
public void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response,
TransactionException tex) {
super.onTransactionException(request, response, tex);
checkTransactionStatus(request, response);
}

@Override
public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {
super.onException(request, response, rex);
checkTransactionStatus(request, response);
}


}, request, response);
return response;
}
Expand All @@ -110,6 +131,7 @@ protected abstract void doGlobalCommit(GlobalCommitRequest request, GlobalCommit
@Override
public GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) {
GlobalRollbackResponse response = new GlobalRollbackResponse();
response.setGlobalStatus(GlobalStatus.Rollbacking);
exceptionHandleTemplate(new AbstractCallback<GlobalRollbackRequest, GlobalRollbackResponse>() {
@Override
public void execute(GlobalRollbackRequest request, GlobalRollbackResponse response)
Expand All @@ -127,24 +149,14 @@ public void onTransactionException(GlobalRollbackRequest request, GlobalRollback
TransactionException tex) {
super.onTransactionException(request, response, tex);
// may be appears StoreException outer layer method catch
GlobalSession globalSession = SessionHolder.findGlobalSession(request.getXid(), false);
if (globalSession != null) {
response.setGlobalStatus(globalSession.getStatus());
} else {
response.setGlobalStatus(GlobalStatus.Finished);
}
checkTransactionStatus(request, response);
}

@Override
public void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) {
super.onException(request, response, rex);
// may be appears StoreException outer layer method catch
GlobalSession globalSession = SessionHolder.findGlobalSession(request.getXid(), false);
if (globalSession != null) {
response.setGlobalStatus(globalSession.getStatus());
} else {
response.setGlobalStatus(GlobalStatus.Finished);
}
checkTransactionStatus(request, response);
}
}, request, response);
return response;
Expand Down Expand Up @@ -252,6 +264,7 @@ protected abstract void doLockCheck(GlobalLockQueryRequest request, GlobalLockQu
@Override
public GlobalStatusResponse handle(GlobalStatusRequest request, final RpcContext rpcContext) {
GlobalStatusResponse response = new GlobalStatusResponse();
response.setGlobalStatus(GlobalStatus.UnKnown);
exceptionHandleTemplate(new AbstractCallback<GlobalStatusRequest, GlobalStatusResponse>() {
@Override
public void execute(GlobalStatusRequest request, GlobalStatusResponse response)
Expand All @@ -264,6 +277,19 @@ public void execute(GlobalStatusRequest request, GlobalStatusResponse response)
e);
}
}

@Override
public void onTransactionException(GlobalStatusRequest request, GlobalStatusResponse response,
TransactionException tex) {
super.onTransactionException(request, response, tex);
checkTransactionStatus(request, response);
}

@Override
public void onException(GlobalStatusRequest request, GlobalStatusResponse response, Exception rex) {
super.onException(request, response, rex);
checkTransactionStatus(request, response);
}
}, request, response);
return response;
}
Expand All @@ -282,6 +308,7 @@ protected abstract void doGlobalStatus(GlobalStatusRequest request, GlobalStatus
@Override
public GlobalReportResponse handle(GlobalReportRequest request, final RpcContext rpcContext) {
GlobalReportResponse response = new GlobalReportResponse();
response.setGlobalStatus(request.getGlobalStatus());
exceptionHandleTemplate(new AbstractCallback<GlobalReportRequest, GlobalReportResponse>() {
@Override
public void execute(GlobalReportRequest request, GlobalReportResponse response)
Expand All @@ -303,4 +330,16 @@ public void execute(GlobalReportRequest request, GlobalReportResponse response)
protected abstract void doGlobalReport(GlobalReportRequest request, GlobalReportResponse response,
RpcContext rpcContext) throws TransactionException;

private void checkTransactionStatus(AbstractGlobalEndRequest request, AbstractGlobalEndResponse response) {
try {
GlobalSession globalSession = SessionHolder.findGlobalSession(request.getXid(), false);
if (globalSession != null) {
response.setGlobalStatus(globalSession.getStatus());
} else {
response.setGlobalStatus(GlobalStatus.Finished);
}
} catch (Exception exx) {
LOGGER.error("check transaction status error,{}]", exx.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse res
throws TransactionException {
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("begin new global transaction applicationId: {},transactionServiceGroup:{}, transactionName: "
+ "{},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout(), response.getXid());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public String begin(String applicationId, String transactionServiceGroup, String
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));

LOGGER.info("Successfully begin global transaction xid = {}", session.getXid());
return session.getXid();
}

Expand Down

0 comments on commit 22c7862

Please sign in to comment.