Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize the performance of XA transactions #554

Merged
merged 3 commits into from May 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 2 additions & 53 deletions pkg/datasource/sql/connector.go
Expand Up @@ -20,14 +20,11 @@ package sql
import (
"context"
"database/sql/driver"
"errors"
"github.com/go-sql-driver/mysql"
"io"
"reflect"
"sync"

"github.com/go-sql-driver/mysql"

"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/util/log"
)

type seataATConnector struct {
Expand Down Expand Up @@ -117,16 +114,6 @@ func (c *seataConnector) Connect(ctx context.Context) (driver.Conn, error) {
if err != nil {
return nil, err
}

// get the version of mysql for xa.
if c.transType == types.XAMode {
version, err := c.dbVersion(ctx, conn)
if err != nil {
return nil, err
}
c.res.SetDbVersion(version)
}

return &Conn{
targetConn: conn,
res: c.res,
Expand All @@ -137,44 +124,6 @@ func (c *seataConnector) Connect(ctx context.Context) (driver.Conn, error) {
}, nil
}

func (c *seataConnector) dbVersion(ctx context.Context, conn driver.Conn) (string, error) {
queryConn, isQueryContext := conn.(driver.QueryerContext)
if !isQueryContext {
return "", errors.New("get db version error for unexpected driver conn")
}

res, err := queryConn.QueryContext(ctx, "SELECT VERSION()", nil)
if err != nil {
log.Errorf("seata connector get the xa mysql version err:%v", err)
return "", err
}

dest := make([]driver.Value, 1)
var version string

if err = res.Next(dest); err != nil {
if err == io.EOF {
return version, nil
}
return "", err
}
if len(dest) != 1 {
return "", errors.New("get the mysql version is not column 1")
}

switch reflect.TypeOf(dest[0]).Kind() {
case reflect.Slice, reflect.Array:
val := reflect.ValueOf(dest[0]).Bytes()
version = string(val)
case reflect.String:
version = reflect.ValueOf(dest[0]).String()
default:
return "", errors.New("get the mysql version is not a string")
}

return version, nil
}

// Driver returns the underlying Driver of the Connector,
// mainly to maintain compatibility with the Driver method
// on sql.DB.
Expand Down
11 changes: 11 additions & 0 deletions pkg/datasource/sql/db.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/datasource/sql/util"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/util/log"
)

type dbOption func(db *DBResource)
Expand Down Expand Up @@ -123,6 +124,16 @@ func (db *DBResource) GetResourceGroupId() string {
}

func (db *DBResource) init() {
ctx := context.Background()
conn, err := db.connector.Connect(ctx)
if err != nil {
log.Errorf("connect: %w", err)
}
version, err := selectDBVersion(ctx, conn)
if err != nil {
log.Errorf("select db version: %w", err)
}
db.SetDbVersion(version)
db.checkDbVersion()
}

Expand Down
58 changes: 53 additions & 5 deletions pkg/datasource/sql/driver.go
Expand Up @@ -23,13 +23,15 @@ import (
"database/sql/driver"
"errors"
"fmt"
"io"
"reflect"
"strings"

"github.com/go-sql-driver/mysql"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
mysql2 "github.com/seata/seata-go/pkg/datasource/sql/datasource/mysql"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/util"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/util/log"
)
Expand Down Expand Up @@ -146,20 +148,16 @@ func (d *seataDriver) getOpenConnectorProxy(connector driver.Connector, dbType t
withDBName(cfg.DBName),
withConnector(connector),
}

res, err := newResource(options...)
if err != nil {
log.Errorf("create new resource: %w", err)
return nil, err
}

datasource.RegisterTableCache(types.DBTypeMySQL, mysql2.NewTableMetaInstance(db))

if err = datasource.GetDataSourceManager(d.branchType).RegisterResource(res); err != nil {
log.Errorf("regisiter resource: %w", err)
return nil, err
}

return &seataConnector{
res: res,
target: connector,
Expand Down Expand Up @@ -192,3 +190,53 @@ func parseResourceID(dsn string) string {
}
return strings.ReplaceAll(res, ",", "|")
}

func selectDBVersion(ctx context.Context, conn driver.Conn) (string, error) {
var rowsi driver.Rows
var err error

queryerCtx, ok := conn.(driver.QueryerContext)
var queryer driver.Queryer
if !ok {
queryer, ok = conn.(driver.Queryer)
}
if ok {
rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, "SELECT VERSION()", nil)
defer func() {
if rowsi != nil {
rowsi.Close()
}
}()
if err != nil {
log.Errorf("ctx driver query: %+v", err)
return "", err
}
} else {
log.Errorf("target conn should been driver.QueryerContext or driver.Queryer")
return "", fmt.Errorf("invalid conn")
}

dest := make([]driver.Value, 1)
var version string
if err = rowsi.Next(dest); err != nil {
if err == io.EOF {
return version, nil
}
return "", err
}
if len(dest) != 1 {
return "", errors.New("get db version is not column 1")
}

switch reflect.TypeOf(dest[0]).Kind() {
case reflect.Slice, reflect.Array:
val := reflect.ValueOf(dest[0]).Bytes()
version = string(val)
case reflect.String:
version = reflect.ValueOf(dest[0]).String()
default:
return "", errors.New("get db version is not a string")
}

return version, nil
}
17 changes: 17 additions & 0 deletions pkg/rm/tcc/fence/fence_driver_tx.go
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fence

import (
Expand Down
17 changes: 17 additions & 0 deletions pkg/rm/tcc/fence/fennce_driver_test.go
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fence

import (
Expand Down