Skip to content

Commit

Permalink
Feature/tcc async commit (#100)
Browse files Browse the repository at this point in the history
implement async commit for tcc
  • Loading branch information
bohehe committed Jan 24, 2022
1 parent 5d9890d commit 53db855
Show file tree
Hide file tree
Showing 12 changed files with 440 additions and 290 deletions.
341 changes: 217 additions & 124 deletions pkg/apis/seata.pb.go

Large diffs are not rendered by default.

40 changes: 21 additions & 19 deletions pkg/apis/seata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ message GlobalSession {
// Retrying rollback (since timeout) after a recoverable failure.
TimeoutRollbackRetrying = 7;

// All branches can be async committed. The committing is NOT done yet, but it can be seen as
// All branches can be async committed. The committing is NOT done yet, but it can be seen as
// committed for TM/RM rpc_client.
AsyncCommitting = 8;

Expand Down Expand Up @@ -215,6 +215,7 @@ message BranchSession {
BranchType Type = 7 [(gogoproto.moretags) = "xorm:\"branch_type\""];
BranchStatus Status = 8 [(gogoproto.moretags) = "xorm:\"status\""];
bytes ApplicationData = 9 [(gogoproto.moretags) = "xorm:\"application_data\""];
bool AsyncCommit = 10 [(gogoproto.moretags) = "xorm:\"async_commit\""];
}

message RowLock {
Expand All @@ -227,40 +228,41 @@ message RowLock {
string RowKey = 7 [(gogoproto.moretags) = "xorm:\"row_key\""];
}

// GlobalBeginRequest represents a global transaction begin
// GlobalBeginRequest represents a global transaction begin
message GlobalBeginRequest {
string Addressing = 1;
int32 Timeout = 2;
string TransactionName = 3;
}

// GlobalBeginResponse represents a response to GlobalBeginRequest
// GlobalBeginResponse represents a response to GlobalBeginRequest
message GlobalBeginResponse {
ResultCode ResultCode = 1;
ExceptionCode ExceptionCode = 2;
string Message = 3;
string XID = 4;
}

// BranchRegisterRequest represents a branch transaction join in the global transaction
// BranchRegisterRequest represents a branch transaction join in the global transaction
message BranchRegisterRequest {
string Addressing = 1;
string XID = 2;
string ResourceID = 3;
string LockKey = 4;
BranchSession.BranchType BranchType = 5;
bytes ApplicationData = 6;
bool AsyncCommit = 7;
}

// BranchRegisterResponse represents a response to BranchRegisterRequest
// BranchRegisterResponse represents a response to BranchRegisterRequest
message BranchRegisterResponse {
ResultCode ResultCode = 1;
ExceptionCode ExceptionCode = 2;
string Message = 3;
int64 BranchID = 4;
}

// BranchReportRequest represents a request to report branch transaction execution status
// BranchReportRequest represents a request to report branch transaction execution status
message BranchReportRequest {
string XID = 1;
int64 BranchID = 2;
Expand All @@ -270,7 +272,7 @@ message BranchReportRequest {
bytes ApplicationData = 6;
}

// BranchReportResponse represents a response to BranchReportRequest
// BranchReportResponse represents a response to BranchReportRequest
message BranchReportResponse {
ResultCode ResultCode = 1;
ExceptionCode ExceptionCode = 2;
Expand All @@ -293,60 +295,60 @@ message GlobalLockQueryResponse {
bool Lockable = 4;
}

// GlobalStatusRequest represents a request to query the global transaction status
// GlobalStatusRequest represents a request to query the global transaction status
message GlobalStatusRequest {
string XID = 1;
}

// GlobalStatusResponse represents a response to GlobalStatusRequest
// GlobalStatusResponse represents a response to GlobalStatusRequest
message GlobalStatusResponse {
ResultCode ResultCode = 1;
ExceptionCode ExceptionCode = 2;
string Message = 3;
GlobalSession.GlobalStatus GlobalStatus = 4;
}

// GlobalCommitRequest represents a request to commit global transaction
// GlobalCommitRequest represents a request to commit global transaction
message GlobalCommitRequest {
string XID = 1;
}

// GlobalCommitResponse represents a response to GlobalCommitRequest
// GlobalCommitResponse represents a response to GlobalCommitRequest
message GlobalCommitResponse {
ResultCode ResultCode = 1;
ExceptionCode ExceptionCode = 2;
string Message = 3;
GlobalSession.GlobalStatus GlobalStatus = 4;
}

// GlobalRollbackRequest represents a request to rollback global transaction
// GlobalRollbackRequest represents a request to rollback global transaction
message GlobalRollbackRequest {
string XID = 1;
}

// GlobalRollbackResponse represents a response to GlobalRollbackRequest
// GlobalRollbackResponse represents a response to GlobalRollbackRequest
message GlobalRollbackResponse {
ResultCode ResultCode = 1;
ExceptionCode ExceptionCode = 2;
string Message = 3;
GlobalSession.GlobalStatus GlobalStatus = 4;
}

// GlobalReportRequest represents a request to report global transaction execution status
// GlobalReportRequest represents a request to report global transaction execution status
message GlobalReportRequest {
string XID = 1;
GlobalSession.GlobalStatus GlobalStatus = 2;
}

// GlobalReportResponse represents a response to GlobalReportRequest
// GlobalReportResponse represents a response to GlobalReportRequest
message GlobalReportResponse {
ResultCode ResultCode = 1;
ExceptionCode ExceptionCode = 2;
string Message = 3;
GlobalSession.GlobalStatus GlobalStatus = 4;
}

// BranchCommitRequest represents a request to commit branch transaction
// BranchCommitRequest represents a request to commit branch transaction
message BranchCommitRequest {
string XID = 1;
int64 BranchID = 2;
Expand All @@ -356,7 +358,7 @@ message BranchCommitRequest {
bytes ApplicationData = 6;
}

// BranchCommitResponse represents a response to BranchCommitRequest
// BranchCommitResponse represents a response to BranchCommitRequest
message BranchCommitResponse {
ResultCode ResultCode = 1;
ExceptionCode ExceptionCode = 2;
Expand All @@ -366,7 +368,7 @@ message BranchCommitResponse {
BranchSession.BranchStatus BranchStatus = 6;
}

// BranchCommitRequest represents a request to rollback branch transaction
// BranchCommitRequest represents a request to rollback branch transaction
message BranchRollbackRequest {
string XID = 1;
int64 BranchID = 2;
Expand All @@ -376,7 +378,7 @@ message BranchRollbackRequest {
bytes ApplicationData = 6;
}

// BranchRollbackResponse represents a response to BranchRollbackRequest
// BranchRollbackResponse represents a response to BranchRollbackRequest
message BranchRollbackResponse {
ResultCode ResultCode = 1;
ExceptionCode ExceptionCode = 2;
Expand Down
1 change: 1 addition & 0 deletions pkg/client/base/context/business_action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ type BusinessActionContext struct {
BranchID int64
ActionName string
ActionContext map[string]interface{}
AsyncCommit bool
}
3 changes: 2 additions & 1 deletion pkg/client/rm/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,15 @@ func (manager *ResourceManager) branchCommunicate() {
}

func (manager *ResourceManager) BranchRegister(ctx context.Context, xid string, resourceID string,
branchType apis.BranchSession_BranchType, applicationData []byte, lockKeys string) (int64, error) {
branchType apis.BranchSession_BranchType, applicationData []byte, lockKeys string, asyncCommit bool) (int64, error) {
request := &apis.BranchRegisterRequest{
Addressing: manager.addressing,
XID: xid,
ResourceID: resourceID,
LockKey: lockKeys,
BranchType: branchType,
ApplicationData: applicationData,
AsyncCommit: asyncCommit,
}
resp, err := manager.rpcClient.BranchRegister(ctx, request)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/client/tcc/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var (
)

type TccService interface {
Try(ctx *ctx.BusinessActionContext) (bool, error)
Try(ctx *ctx.BusinessActionContext, async bool) (bool, error)
Confirm(ctx *ctx.BusinessActionContext) bool
Cancel(ctx *ctx.BusinessActionContext) bool
}
Expand Down Expand Up @@ -162,7 +162,15 @@ func doTccActionLogStore(ctx *ctx.BusinessActionContext, resource *TCCResource)
return 0, err
}

branchID, err := rm.GetResourceManager().BranchRegister(ctx.RootContext, ctx.XID, resource.GetResourceID(), resource.GetBranchType(), applicationData, "")
branchID, err := rm.GetResourceManager().BranchRegister(
ctx.RootContext,
ctx.XID,
resource.GetResourceID(),
resource.GetBranchType(),
applicationData,
"",
ctx.AsyncCommit,
)
if err != nil {
log.Errorf("TCC branch Register error, xid: %s", ctx.XID)
return 0, errors.WithStack(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tc/model/global_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (gt *GlobalTransaction) GetBranch(branchID int64) *apis.BranchSession {

func (gt *GlobalTransaction) CanBeCommittedAsync() bool {
for branchSession := range gt.BranchSessions {
if branchSession.Type == apis.TCC {
if !branchSession.AsyncCommit {
return false
}
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/tc/server/transaction_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,11 +709,11 @@ func (tc *TransactionCoordinator) BranchCommunicate(stream apis.ResourceManagerS
func (tc *TransactionCoordinator) BranchRegister(ctx context.Context, request *apis.BranchRegisterRequest) (*apis.BranchRegisterResponse, error) {
gt := tc.holder.FindGlobalTransaction(request.XID)
if gt == nil {
log.Errorf("could not found global transaction xid = %s", request.XID)
log.Errorf("could not find global transaction xid = %s", request.XID)
return &apis.BranchRegisterResponse{
ResultCode: apis.ResultCodeFailed,
ExceptionCode: apis.GlobalTransactionNotExist,
Message: fmt.Sprintf("could not found global transaction xid = %s", request.XID),
Message: fmt.Sprintf("could not find global transaction xid = %s", request.XID),
}, nil
}

Expand All @@ -722,7 +722,7 @@ func (tc *TransactionCoordinator) BranchRegister(ctx context.Context, request *a
return &apis.BranchRegisterResponse{
ResultCode: apis.ResultCodeFailed,
ExceptionCode: apis.FailedLockGlobalTransaction,
Message: fmt.Sprintf("could not found global transaction xid = %s", request.XID),
Message: fmt.Sprintf("could not find global transaction xid = %s", request.XID),
}, nil
}
if result {
Expand All @@ -743,6 +743,13 @@ func (tc *TransactionCoordinator) BranchRegister(ctx context.Context, request *a
}, nil
}

var asyncCommit bool
if request.BranchType == apis.AT {
asyncCommit = true
} else {
asyncCommit = request.AsyncCommit
}

bs := &apis.BranchSession{
Addressing: request.Addressing,
XID: request.XID,
Expand All @@ -753,6 +760,7 @@ func (tc *TransactionCoordinator) BranchRegister(ctx context.Context, request *a
Type: request.BranchType,
Status: apis.Registered,
ApplicationData: request.ApplicationData,
AsyncCommit: asyncCommit,
}

if bs.Type == apis.AT {
Expand Down Expand Up @@ -793,11 +801,11 @@ func (tc *TransactionCoordinator) BranchRegister(ctx context.Context, request *a
func (tc *TransactionCoordinator) BranchReport(ctx context.Context, request *apis.BranchReportRequest) (*apis.BranchReportResponse, error) {
gt := tc.holder.FindGlobalTransaction(request.XID)
if gt == nil {
log.Errorf("could not found global transaction xid = %s", request.XID)
log.Errorf("could not find global transaction xid = %s", request.XID)
return &apis.BranchReportResponse{
ResultCode: apis.ResultCodeFailed,
ExceptionCode: apis.GlobalTransactionNotExist,
Message: fmt.Sprintf("could not found global transaction xid = %s", request.XID),
Message: fmt.Sprintf("could not find global transaction xid = %s", request.XID),
}, nil
}

Expand All @@ -806,7 +814,7 @@ func (tc *TransactionCoordinator) BranchReport(ctx context.Context, request *api
return &apis.BranchReportResponse{
ResultCode: apis.ResultCodeFailed,
ExceptionCode: apis.BranchTransactionNotExist,
Message: fmt.Sprintf("could not found branch session xid = %s branchID = %d", gt.XID, request.BranchID),
Message: fmt.Sprintf("could not find branch session xid = %s branchID = %d", gt.XID, request.BranchID),
}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/tc/storage/driver/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (driver *driver) UpdateGlobalSessionStatus(session *apis.GlobalSession, sta
gt.Status = status
return nil
}
return fmt.Errorf("could not found global transaction xid = %s", session.XID)
return fmt.Errorf("could not find global transaction xid = %s", session.XID)
}

// Inactive global session.
Expand All @@ -132,7 +132,7 @@ func (driver *driver) InactiveGlobalSession(session *apis.GlobalSession) error {
gt.Active = false
return nil
}
return fmt.Errorf("could not found global transaction xid = %s", session.XID)
return fmt.Errorf("could not find global transaction xid = %s", session.XID)
}

// Remove global session.
Expand All @@ -149,7 +149,7 @@ func (driver *driver) AddBranchSession(globalSession *apis.GlobalSession, sessio
gt.BranchSessions[session] = true
return nil
}
return fmt.Errorf("could not found global transaction xid = %s", session.XID)
return fmt.Errorf("could not find global transaction xid = %s", session.XID)
}

// Find branch session.
Expand Down Expand Up @@ -195,7 +195,7 @@ func (driver *driver) RemoveBranchSession(globalSession *apis.GlobalSession, ses
delete(gt.BranchSessions, session)
return nil
}
return fmt.Errorf("could not found global transaction xid = %s", session.XID)
return fmt.Errorf("could not find global transaction xid = %s", session.XID)
}

// AcquireLock Acquire lock boolean.
Expand Down

0 comments on commit 53db855

Please sign in to comment.