Skip to content

Commit

Permalink
Feat add two phase (#122)
Browse files Browse the repository at this point in the history
* add two phase

* support seata dubbo
  • Loading branch information
luky116 committed Jul 19, 2022
1 parent 5ae2253 commit 1ced5eb
Show file tree
Hide file tree
Showing 33 changed files with 1,809 additions and 324 deletions.
13 changes: 5 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@ module github.com/seata/seata-go
go 1.16

require (
dubbo.apache.org/dubbo-go/v3 v3.0.2-0.20220508105316-b27ec53b7bab
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/apache/dubbo-getty v1.4.8
github.com/dubbogo/gost v1.12.3
github.com/golang/snappy v0.0.4 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/apache/dubbo-go-hessian2 v1.11.0
github.com/dubbogo/gost v1.12.5
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pkg/errors v0.9.1
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.19.1
go.uber.org/zap v1.21.0
golang.org/x/tools v0.1.11 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect
)
552 changes: 544 additions & 8 deletions go.sum

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,12 @@ const (
StartTime = "action-start-time"
HostName = "host-name"
ActionContext = "actionContext"

SeataXidKey = "SEATA_XID"
XidKey = "TX_XID"
MdcXidKey = "X-TX-XID"
MdcBranchIDKey = "X-TX-BRANCH-ID"
BranchTypeKey = "TX_BRANCH_TYPE"
GlobalLockKey = "TX_LOCK"
SeataFilterKey = "seataDubboFilter"
)
2 changes: 1 addition & 1 deletion pkg/common/log/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"errors"
"fmt"
"time"

getty "github.com/apache/dubbo-getty"
"github.com/natefinch/lumberjack"
"go.uber.org/zap"
Expand Down
49 changes: 49 additions & 0 deletions pkg/common/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package types

import "reflect"

type ReferencedService interface {
Reference() string
}

// GetReference return the reference id of the service.
// If the service implemented the ReferencedService interface,
// it will call the Reference method. If not, it will
// return the struct name as the reference id.
func GetReference(service interface{}) string {
if s, ok := service.(ReferencedService); ok {
return s.Reference()
}
ref := ""
sType := reflect.TypeOf(service)
kind := sType.Kind()
switch kind {
case reflect.Struct:
ref = sType.Name()
case reflect.Ptr:
sName := sType.Elem().Name()
if sName != "" {
ref = sName
} else {
ref = sType.Elem().Field(0).Name
}
}
return ref
}
6 changes: 3 additions & 3 deletions pkg/config/getty_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type GettyConfig struct {
// GetDefaultGettyConfig ...
func GetDefaultGettyConfig() GettyConfig {
return GettyConfig{
ReconnectInterval: 0,
ConnectionNum: 2,
ReconnectInterval: 1,
ConnectionNum: 20,
HeartbeatPeriod: 10 * time.Second,
GettySessionParam: GettySessionParam{
CompressEncoding: false,
Expand All @@ -51,7 +51,7 @@ func GetDefaultGettyConfig() GettyConfig {
TCPReadTimeout: time.Second,
TCPWriteTimeout: 5 * time.Second,
WaitTimeout: time.Second,
CronPeriod: time.Second,
CronPeriod: 5 * time.Second,
MaxMsgLen: 4096,
SessionName: "rpc_client",
},
Expand Down
96 changes: 96 additions & 0 deletions pkg/integration/dubbo/dubbo_transaction_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dubbo

import (
"context"
"strings"
"sync"

"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/tm"
)

var (
seataFilter *dubboTransactionFilter
once sync.Once
)

type Filter interface {
}

type dubboTransactionFilter struct {
}

func GetDubboTransactionFilter() filter.Filter {
if seataFilter == nil {
once.Do(func() {
seataFilter = &dubboTransactionFilter{}
})
}
return seataFilter
}

func (d *dubboTransactionFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
xid := tm.GetXID(ctx)
rpcXid := d.getRpcXid(invocation)
log.Infof("xid in context is %s, xid in RpcContextis %s", xid, rpcXid)

if xid != "" {
// dubbo go
invocation.SetAttachment(common.SeataXidKey, xid)
// dubbo java
invocation.SetAttachment(common.XidKey, xid)
} else if rpcXid != xid {
ctx = tm.InitSeataContext(ctx)
tm.SetXIDCopy(ctx, rpcXid)
}
return invoker.Invoke(ctx, invocation)
// todo why should unbind xid???
}

func (*dubboTransactionFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}

func (d *dubboTransactionFilter) getRpcXid(invocation protocol.Invocation) string {
rpcXid := d.getDubboGoRpcXid(invocation)
if rpcXid == "" {
rpcXid = d.getDubboJavaRpcXid(invocation)
}
return rpcXid
}

func (*dubboTransactionFilter) getDubboGoRpcXid(invocation protocol.Invocation) string {
rpcXid := invocation.GetAttachmentWithDefaultValue(common.SeataXidKey, "")
if rpcXid == "" {
rpcXid = invocation.GetAttachmentWithDefaultValue(strings.ToLower(common.SeataXidKey), "")
}
return rpcXid
}

func (*dubboTransactionFilter) getDubboJavaRpcXid(invocation protocol.Invocation) string {
rpcXid := invocation.GetAttachmentWithDefaultValue(common.XidKey, "")
if rpcXid == "" {
rpcXid = invocation.GetAttachmentWithDefaultValue(strings.ToLower(common.XidKey), "")
}
return rpcXid
}
29 changes: 29 additions & 0 deletions pkg/integration/integration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package integration

import (
"dubbo.apache.org/dubbo-go/v3/common/extension"

"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/integration/dubbo"
)

func UseDubbo() {
extension.SetFilter(common.SeataFilterKey, dubbo.GetDubboTransactionFilter)
}
2 changes: 1 addition & 1 deletion pkg/remoting/getty/getty_remoting.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

const (
RPC_REQUEST_TIMEOUT = 30 * time.Second
RPC_REQUEST_TIMEOUT = 5 * time.Second
)

var (
Expand Down
8 changes: 3 additions & 5 deletions pkg/remoting/getty/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,32 +72,30 @@ func (client *gettyClientHandler) OnOpen(session getty.Session) error {
sessionManager.ReleaseGettySession(session)
return
}

//todo
//client.GettySessionOnOpenChannel <- session.RemoteAddr()
}()

return nil
}

func (client *gettyClientHandler) OnError(session getty.Session, err error) {
log.Infof("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
sessionManager.ReleaseGettySession(session)
}

func (client *gettyClientHandler) OnClose(session getty.Session) {
log.Infof("OnClose session{%s} is closing......", session.Stat())
sessionManager.ReleaseGettySession(session)
}

func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
ctx := context.Background()
log.Debugf("received message: {%#v}", pkg)

rpcMessage, ok := pkg.(message.RpcMessage)
if !ok {
log.Errorf("received message is not protocol.RpcMessage. pkg: %#v", pkg)
return
}

if mm, ok := rpcMessage.Body.(message.MessageTypeAware); ok {
processor := client.processorTable[mm.GetTypeCode()]
if processor != nil {
Expand All @@ -111,7 +109,7 @@ func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface
}

func (client *gettyClientHandler) OnCron(session getty.Session) {
GetGettyRemotingClient().SendAsyncRequest(message.HeartBeatMessagePing)
//GetGettyRemotingClient().SendAsyncRequest(message.HeartBeatMessagePing)
}

func (client *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/remoting/getty/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (c *RpcClient) init() {
for _, address := range addressList {
gettyClient := getty.NewTCPClient(
getty.WithServerAddress(address),
getty.WithConnectionNumber((int)(c.conf.GettyConfig.ConnectionNum)),
getty.WithConnectionNumber(c.conf.GettyConfig.ConnectionNum),
getty.WithReconnectInterval(c.conf.GettyConfig.ReconnectInterval),
getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
)
Expand Down
15 changes: 5 additions & 10 deletions pkg/remoting/getty/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,12 @@ import (

var (
MAX_CHECK_ALIVE_RETRY = 600

CHECK_ALIVE_INTERNAL = 100

allSessions = sync.Map{}

CHECK_ALIVE_INTERNAL = 100
allSessions = sync.Map{}
// serverAddress -> rpc_client.Session -> bool
serverSessions = sync.Map{}

sessionSize int32 = 0

sessionManager = &GettySessionManager{}
serverSessions = sync.Map{}
sessionSize int32 = 0
sessionManager = &GettySessionManager{}
)

type GettySessionManager struct{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
func TestClientHeartBeatProcessor(t *testing.T) {
// testcases
var tests = []struct {
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
}{
{
name: "chb-testcase1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
func TestClientOnResponseProcessor(t *testing.T) {
// testcases
var tests = []struct {
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
}{
{
name: "cor-testcase1-mergeResult",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func TestRmBranchCommitProcessor(t *testing.T) {

// testcases
var tests = []struct {
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
}{
{
name: "rbc-testcase1-failure",
Expand Down
Loading

0 comments on commit 1ced5eb

Please sign in to comment.