diff --git a/pkg/admin/router/clusters.go b/pkg/admin/router/clusters.go index da8f7051f..80a58b659 100644 --- a/pkg/admin/router/clusters.go +++ b/pkg/admin/router/clusters.go @@ -83,7 +83,7 @@ func CreateCluster(c *gin.Context) error { return exception.Wrap(exception.CodeInvalidParams, err) } - //TODO how to get cluster name? + // TODO how to get cluster name? err := service.UpsertCluster(context.Background(), tenant, "", cluster) if err != nil { return err diff --git a/pkg/admin/router/db_groups.go b/pkg/admin/router/db_groups.go index 590763c05..1fbb99d1e 100644 --- a/pkg/admin/router/db_groups.go +++ b/pkg/admin/router/db_groups.go @@ -62,7 +62,7 @@ func CreateGroup(c *gin.Context) error { func ListGroups(c *gin.Context) error { service := admin.GetService(c) tenantName := c.Param("tenant") - //cluster := c.Param("cluster") + // cluster := c.Param("cluster") clusters, err := service.ListClusters(context.Background(), tenantName) if err != nil { return err diff --git a/pkg/admin/router/misc.go b/pkg/admin/router/misc.go index 22fb3a280..4839626c3 100644 --- a/pkg/admin/router/misc.go +++ b/pkg/admin/router/misc.go @@ -26,8 +26,10 @@ import ( "github.com/pkg/errors" ) -var _tenantNameRegexp *regexp.Regexp -var _tenantNameRegexpOnce sync.Once +var ( + _tenantNameRegexp *regexp.Regexp + _tenantNameRegexpOnce sync.Once +) func validateTenantName(name string) error { _tenantNameRegexpOnce.Do(func() { diff --git a/pkg/boot/boot.go b/pkg/boot/boot.go index 6f69a0e13..0fac27bee 100644 --- a/pkg/boot/boot.go +++ b/pkg/boot/boot.go @@ -202,7 +202,7 @@ func buildNamespace(ctx context.Context, tenant string, provider Discovery, clus return nil, err } - var initCmds = []namespace.Command{ + initCmds := []namespace.Command{ namespace.UpdateSlowLogger(provider.GetOptions().SlowLogPath), namespace.UpdateParameters(cluster.Parameters), namespace.UpdateSlowThreshold(), diff --git a/pkg/boot/discovery.go b/pkg/boot/discovery.go index fbde9772c..825f6377b 100644 --- a/pkg/boot/discovery.go +++ b/pkg/boot/discovery.go @@ -102,57 +102,57 @@ func (fp *discovery) UpsertTenant(ctx context.Context, tenant string, body *Tena } func (fp *discovery) RemoveTenant(ctx context.Context, tenant string) error { - //TODO implement me + // TODO implement me panic("implement me") } func (fp *discovery) UpsertCluster(ctx context.Context, tenant, cluster string, body *ClusterBody) error { - //TODO implement me + // TODO implement me panic("implement me") } func (fp *discovery) RemoveCluster(ctx context.Context, tenant, cluster string) error { - //TODO implement me + // TODO implement me panic("implement me") } func (fp *discovery) UpsertNode(ctx context.Context, tenant, node string, body *NodeBody) error { - //TODO implement me + // TODO implement me panic("implement me") } func (fp *discovery) RemoveNode(ctx context.Context, tenant, node string) error { - //TODO implement me + // TODO implement me panic("implement me") } func (fp *discovery) UpsertGroup(ctx context.Context, tenant, cluster, group string, body *GroupBody) error { - //TODO implement me + // TODO implement me panic("implement me") } func (fp *discovery) RemoveGroup(ctx context.Context, tenant, cluster, group string) error { - //TODO implement me + // TODO implement me panic("implement me") } func (fp *discovery) BindNode(ctx context.Context, tenant, cluster, group, node string) error { - //TODO implement me + // TODO implement me panic("implement me") } func (fp *discovery) UnbindNode(ctx context.Context, tenant, cluster, group, node string) error { - //TODO implement me + // TODO implement me panic("implement me") } func (fp *discovery) UpsertTable(ctx context.Context, tenant, cluster, table string, body *TableBody) error { - //TODO implement me + // TODO implement me panic("implement me") } func (fp *discovery) RemoveTable(ctx context.Context, tenant, cluster, table string) error { - //TODO implement me + // TODO implement me panic("implement me") } @@ -263,7 +263,6 @@ func (fp *discovery) GetCluster(ctx context.Context, tenant, cluster string) (*C } func (fp *discovery) ListTenants(ctx context.Context) ([]string, error) { - return fp.tenantOp.ListTenants(), nil } @@ -349,7 +348,6 @@ func (fp *discovery) ListGroups(ctx context.Context, tenant, cluster string) ([] } func (fp *discovery) ListNodes(ctx context.Context, tenant, cluster, group string) ([]string, error) { - bingo, ok := fp.loadGroup(tenant, cluster, group) if !ok { return nil, nil @@ -393,7 +391,6 @@ func (fp *discovery) ListTables(ctx context.Context, tenant, cluster string) ([] } func (fp *discovery) GetNode(ctx context.Context, tenant, cluster, group, node string) (*config.Node, error) { - op, ok := fp.centers[tenant] if !ok { return nil, ErrorNoTenant diff --git a/pkg/config/api.go b/pkg/config/api.go index 2e98774f8..e68b86dc7 100644 --- a/pkg/config/api.go +++ b/pkg/config/api.go @@ -101,7 +101,7 @@ type ( Options map[string]interface{} `yaml:"options"` } - //TenantOperator actions specific to tenant spaces + // TenantOperator actions specific to tenant spaces TenantOperator interface { io.Closer diff --git a/pkg/config/config.go b/pkg/config/config.go index 2be559bdf..855e99c35 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -57,7 +57,6 @@ type PathInfo struct { } func NewPathInfo(tenant string) *PathInfo { - p := &PathInfo{} p.DefaultTenantBaseConfigPath = PathKey(filepath.Join(string(DefaultRootPath), fmt.Sprintf("tenants/%s", tenant))) @@ -260,7 +259,7 @@ func (tp *tenantOperate) CreateTenant(name string) error { return errors.Wrap(err, "create tenant name") } - //need to insert the relevant configuration data under the relevant tenant + // need to insert the relevant configuration data under the relevant tenant tenantPathInfo := NewPathInfo(name) for i := range tenantPathInfo.ConfigKeyMapping { if err := tp.op.Save(i, []byte("")); err != nil { @@ -390,7 +389,6 @@ type center struct { } func NewCenter(tenant string, op StoreOperator) Center { - p := NewPathInfo(tenant) holders := map[PathKey]*atomic.Value{} @@ -544,7 +542,6 @@ func (c *center) PersistContext(ctx context.Context) error { } func (c *center) doPersist(ctx context.Context, conf *Tenant) error { - configJson, err := json.Marshal(conf) if err != nil { return errors.Wrap(err, "config json.marshal failed") @@ -564,9 +561,8 @@ func (c *center) doPersist(ctx context.Context, conf *Tenant) error { return nil } -//Subscribe +// Subscribe func (c *center) Subscribe(ctx context.Context, et EventType, f callback) context.CancelFunc { - return c.observers.add(et, f) } diff --git a/pkg/config/default.go b/pkg/config/default.go index 9e84f6520..5098ed9f6 100644 --- a/pkg/config/default.go +++ b/pkg/config/default.go @@ -26,9 +26,7 @@ import ( "github.com/arana-db/arana/pkg/util/log" ) -var ( - ErrorNoStoreOperate = errors.New("no store operate") -) +var ErrorNoStoreOperate = errors.New("no store operate") func GetStoreOperate() StoreOperator { return storeOperate diff --git a/pkg/config/diff.go b/pkg/config/diff.go index bbc3d687c..375a4f96b 100644 --- a/pkg/config/diff.go +++ b/pkg/config/diff.go @@ -171,7 +171,6 @@ func (c Clusters) Diff(old Clusters) *ClustersEvent { } func (d *DataSourceCluster) Diff(old *DataSourceCluster) *ClusterEvent { - ret := &ClusterEvent{ Name: d.Name, Type: d.Type, diff --git a/pkg/config/equals.go b/pkg/config/equals.go index 6a12efd43..6a11858fa 100644 --- a/pkg/config/equals.go +++ b/pkg/config/equals.go @@ -23,7 +23,6 @@ import ( ) func (u *User) Equals(o *User) bool { - return u.Username == o.Username && u.Password == o.Password } diff --git a/pkg/config/etcd/etcd.go b/pkg/config/etcd/etcd.go index 26fd8810b..ca27f023b 100644 --- a/pkg/config/etcd/etcd.go +++ b/pkg/config/etcd/etcd.go @@ -38,9 +38,7 @@ import ( "github.com/arana-db/arana/pkg/util/log" ) -var ( - PluginName = "etcd" -) +var PluginName = "etcd" func init() { config.Register(&storeOperate{ @@ -67,7 +65,6 @@ func (c *storeOperate) Init(options map[string]interface{}) error { DialTimeout: 10 * time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()}, }) - if err != nil { log.Errorf("failed to initialize etcd client error: %s", err.Error()) return err @@ -142,7 +139,6 @@ func (w *etcdWatcher) run(ctx context.Context) { } func (c *storeOperate) Watch(key config.PathKey) (<-chan []byte, error) { - c.lock.Lock() defer c.lock.Unlock() diff --git a/pkg/config/file/file.go b/pkg/config/file/file.go index c644cd755..c6d7e0014 100644 --- a/pkg/config/file/file.go +++ b/pkg/config/file/file.go @@ -180,7 +180,7 @@ func (s *storeOperate) Save(key config.PathKey, val []byte) error { return nil } -//Get +// Get func (s *storeOperate) Get(key config.PathKey) ([]byte, error) { s.lock.RLock() defer s.lock.RUnlock() @@ -201,7 +201,6 @@ func (s *storeOperate) Name() string { } func (s *storeOperate) Close() error { - for i := range s.cancels { s.cancels[i]() } @@ -209,7 +208,7 @@ func (s *storeOperate) Close() error { return nil } -//readFromFile +// readFromFile func (s *storeOperate) readFromFile(path string, cfg *config.Configuration) error { var ( f *os.File @@ -257,9 +256,8 @@ func formatPath(path string) (string, error) { return path, nil } -//watchFileChange +// watchFileChange func (s *storeOperate) watchFileChange(ctx context.Context, path string) { - refreshT := time.NewTicker(30 * time.Second) oldStat, err := os.Stat(path) @@ -292,5 +290,4 @@ func (s *storeOperate) watchFileChange(ctx context.Context, path string) { } } - } diff --git a/pkg/config/nacos/nacos.go b/pkg/config/nacos/nacos.go index aa243d2a0..baaa89af4 100644 --- a/pkg/config/nacos/nacos.go +++ b/pkg/config/nacos/nacos.go @@ -180,7 +180,6 @@ func (s *storeOperate) Get(key config.PathKey) ([]byte, error) { DataId: buildNacosDataId(string(key)), Group: s.groupName, }) - if err != nil { return nil, err } diff --git a/pkg/config/nacos/nacos_test.go b/pkg/config/nacos/nacos_test.go index 6676a7cd4..243197fd4 100644 --- a/pkg/config/nacos/nacos_test.go +++ b/pkg/config/nacos/nacos_test.go @@ -238,5 +238,4 @@ func Test_storeOpertae(t *testing.T) { err = operate.Save(mockPath[newCfg.Data.Tenants[0].Name].DefaultConfigDataShadowRulePath, []byte(" ")) assert.NoError(t, err, "blank string should be success") - } diff --git a/pkg/executor/redirect.go b/pkg/executor/redirect.go index c7ff4a068..c0b88a94a 100644 --- a/pkg/executor/redirect.go +++ b/pkg/executor/redirect.go @@ -120,7 +120,6 @@ func (executor *RedirectExecutor) ExecuteFieldList(ctx *proto.Context) ([]proto. } func (executor *RedirectExecutor) ExecutorComQuery(ctx *proto.Context) (proto.Result, uint16, error) { - var ( schemaless bool // true if schema is not specified err error diff --git a/pkg/proto/runtime.go b/pkg/proto/runtime.go index 58d387c43..b39b61a72 100644 --- a/pkg/proto/runtime.go +++ b/pkg/proto/runtime.go @@ -30,10 +30,17 @@ const ( ) type ( + // VersionSupport provides the version string. + VersionSupport interface { + // Version returns the version. + Version(ctx context.Context) (string, error) + } + // VConn represents a virtual connection which can be used to query/exec from a db. VConn interface { // Query requests a query command. Query(ctx context.Context, db string, query string, args ...interface{}) (Result, error) + // Exec requests a exec command Exec(ctx context.Context, db string, query string, args ...interface{}) (Result, error) } @@ -73,24 +80,36 @@ type ( DB interface { io.Closer Callable + // ID returns the unique id. ID() string + // IdleTimeout returns the idle timeout. IdleTimeout() time.Duration + // MaxCapacity returns the max capacity. MaxCapacity() int + // Capacity returns the capacity. Capacity() int + // Weight returns the weight. Weight() Weight + // SetCapacity sets the capacity. SetCapacity(capacity int) error + // SetMaxCapacity sets the max capacity. SetMaxCapacity(maxCapacity int) error + // SetIdleTimeout sets the idle timeout. SetIdleTimeout(idleTimeout time.Duration) error + // SetWeight sets the weight. SetWeight(weight Weight) error + + // Variable returns the variable value. + Variable(ctx context.Context, name string) (string, error) } // Executable represents an executor which can send sql request. diff --git a/pkg/runtime/ast/expression_atom.go b/pkg/runtime/ast/expression_atom.go index e46f49150..10d1c4072 100644 --- a/pkg/runtime/ast/expression_atom.go +++ b/pkg/runtime/ast/expression_atom.go @@ -47,6 +47,12 @@ var ( _ ExpressionAtom = (*IntervalExpressionAtom)(nil) ) +var _compat80Dict = map[string]string{ + "query_cache_size": "'1048576'", + "query_cache_type": "'OFF'", + "tx_isolation": "@@transaction_isolation", +} + type expressionAtomPhantom struct{} type ExpressionAtom interface { @@ -109,7 +115,18 @@ func (sy *SystemVariableExpressionAtom) Accept(visitor Visitor) (interface{}, er return visitor.VisitAtomSystemVariable(sy) } -func (sy *SystemVariableExpressionAtom) Restore(_ RestoreFlag, sb *strings.Builder, _ *[]int) error { +func (sy *SystemVariableExpressionAtom) IsCompat80() bool { + _, ok := _compat80Dict[sy.Name] + return ok +} + +func (sy *SystemVariableExpressionAtom) Restore(rf RestoreFlag, sb *strings.Builder, _ *[]int) error { + if rf.Has(RestoreCompat80) { + if compat80, ok := _compat80Dict[sy.Name]; ok { + sb.WriteString(compat80) + return nil + } + } sb.WriteString("@@") sb.WriteString(sy.Name) return nil diff --git a/pkg/runtime/ast/restore.go b/pkg/runtime/ast/restore.go index 4651825b6..96ac03aca 100644 --- a/pkg/runtime/ast/restore.go +++ b/pkg/runtime/ast/restore.go @@ -26,6 +26,7 @@ const ( RestoreLowerKeyword RestoreFlag = 1 << iota // force use lower-case keyword RestoreWithoutAlias + RestoreCompat80 ) type RestoreFlag uint32 diff --git a/pkg/runtime/function2/abs_test.go b/pkg/runtime/function2/abs_test.go index d968393e0..00cbb17b0 100644 --- a/pkg/runtime/function2/abs_test.go +++ b/pkg/runtime/function2/abs_test.go @@ -63,5 +63,4 @@ func TestAbs(t *testing.T) { assert.Equal(t, it.out, fmt.Sprint(out)) }) } - } diff --git a/pkg/runtime/optimize/dml/select.go b/pkg/runtime/optimize/dml/select.go index 35f5fcfb3..9d81adcb8 100644 --- a/pkg/runtime/optimize/dml/select.go +++ b/pkg/runtime/optimize/dml/select.go @@ -72,7 +72,16 @@ func optimizeSelect(ctx context.Context, o *optimize.Optimizer) (proto.Plan, err } ret := &dml.SimpleQueryPlan{Stmt: stmt} ret.BindArgs(o.Args) - return ret, nil + + normalizedFields := make([]string, 0, len(stmt.Select)) + for i := range stmt.Select { + normalizedFields = append(normalizedFields, stmt.Select[i].DisplayName()) + } + + return &dml.RenamePlan{ + Plan: ret, + RenameList: normalizedFields, + }, nil } // --- SIMPLE QUERY BEGIN --- @@ -114,7 +123,15 @@ func optimizeSelect(ctx context.Context, o *optimize.Optimizer) (proto.Plan, err } ret.BindArgs(o.Args) - return ret, nil + normalizedFields := make([]string, 0, len(stmt.Select)) + for i := range stmt.Select { + normalizedFields = append(normalizedFields, stmt.Select[i].DisplayName()) + } + + return &dml.RenamePlan{ + Plan: ret, + RenameList: normalizedFields, + }, nil } // Go through first table if no shards matched. diff --git a/pkg/runtime/plan/dal/analyze_table.go b/pkg/runtime/plan/dal/analyze_table.go index 269c9a04a..d5d9391ab 100644 --- a/pkg/runtime/plan/dal/analyze_table.go +++ b/pkg/runtime/plan/dal/analyze_table.go @@ -46,7 +46,8 @@ type AnalyzeTablePlan struct { func NewAnalyzeTablePlan( stmt *ast.AnalyzeTableStatement, shards rule.DatabaseTables, - shardsByName map[string]rule.DatabaseTables) *AnalyzeTablePlan { + shardsByName map[string]rule.DatabaseTables, +) *AnalyzeTablePlan { return &AnalyzeTablePlan{ Stmt: stmt, Shards: shards, diff --git a/pkg/runtime/plan/dml/mapping.go b/pkg/runtime/plan/dml/mapping.go index e3b064a8a..55dab5d9f 100644 --- a/pkg/runtime/plan/dml/mapping.go +++ b/pkg/runtime/plan/dml/mapping.go @@ -233,7 +233,6 @@ func (vt *virtualValueVisitor) VisitFunction(node *ast.Function) (interface{}, e return nil, errors.Wrapf(err, "failed to call function '%s'", funcName) } return res, nil - } func (vt *virtualValueVisitor) VisitAtomNested(node *ast.NestedExpressionAtom) (interface{}, error) { diff --git a/pkg/runtime/plan/dml/simple_select.go b/pkg/runtime/plan/dml/simple_select.go index a19644611..5c5b6f2bf 100644 --- a/pkg/runtime/plan/dml/simple_select.go +++ b/pkg/runtime/plan/dml/simple_select.go @@ -34,7 +34,6 @@ import ( "github.com/arana-db/arana/pkg/runtime/ast" rcontext "github.com/arana-db/arana/pkg/runtime/context" "github.com/arana-db/arana/pkg/runtime/plan" - "github.com/arana-db/arana/pkg/util/log" ) var _ proto.Plan = (*SimpleQueryPlan)(nil) @@ -50,8 +49,30 @@ func (s *SimpleQueryPlan) Type() proto.PlanType { return proto.PlanTypeQuery } +func (s *SimpleQueryPlan) isCompat80Enabled(ctx context.Context, conn proto.VConn) bool { + var ( + frontendVersion string + backendVersion string + ) + + if ver := rcontext.Version(ctx); len(ver) > 0 { + frontendVersion = ver + } + if vs, ok := conn.(proto.VersionSupport); ok { + backendVersion, _ = vs.Version(ctx) + } + + if len(frontendVersion) < 1 || len(backendVersion) < 1 { + return false + } + + // 5.x -> 8.x+ + return backendVersion[0] >= '8' && frontendVersion[0] < '8' +} + func (s *SimpleQueryPlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) { var ( + rf = ast.RestoreDefault sb strings.Builder indexes []int res proto.Result @@ -63,7 +84,11 @@ func (s *SimpleQueryPlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.R discard := s.filter() - if err = s.generate(&sb, &indexes); err != nil { + if s.isCompat80Enabled(ctx, conn) { + rf |= ast.RestoreCompat80 + } + + if err = s.generate(rf, &sb, &indexes); err != nil { return nil, errors.Wrap(err, "failed to generate sql") } @@ -72,20 +97,6 @@ func (s *SimpleQueryPlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.R args = s.ToArgs(indexes) ) - version := rcontext.Version(ctx) - if strings.Compare(version, "8.0.0") >= 0 { - argsCacheSize57 := "@@query_cache_size" - argsCacheSize80 := "1048576" - argsCacheType57 := "@@query_cache_type" - argsCacheType80 := "'OFF'" - argsTxIso57 := "@@tx_isolation" - argsTxIso80 := "@@transaction_isolation" - query = strings.NewReplacer( - argsCacheSize57, argsCacheSize80, - argsCacheType57, argsCacheType80, - argsTxIso57, argsTxIso80).Replace(query) - log.Debugf("ComQueryFor8.0: %s", query) - } if res, err = conn.Query(ctx, s.Database, query, args...); err != nil { return nil, errors.WithStack(err) } @@ -139,11 +150,11 @@ func (s *SimpleQueryPlan) resetTable(tgt *ast.SelectStatement, table string) err return nil } -func (s *SimpleQueryPlan) generate(sb *strings.Builder, args *[]int) error { +func (s *SimpleQueryPlan) generate(rf ast.RestoreFlag, sb *strings.Builder, args *[]int) error { switch len(s.Tables) { case 0: // no table reset - if err := s.Stmt.Restore(ast.RestoreDefault, sb, args); err != nil { + if err := s.Stmt.Restore(rf, sb, args); err != nil { return errors.WithStack(err) } case 1: diff --git a/pkg/runtime/rule/shard_expr_type_test.go b/pkg/runtime/rule/shard_expr_type_test.go index 5b88b776d..47be4c264 100644 --- a/pkg/runtime/rule/shard_expr_type_test.go +++ b/pkg/runtime/rule/shard_expr_type_test.go @@ -65,8 +65,8 @@ func TestNumEval(t *testing.T) { evalRes, _ := expr.Eval(test.env) got := "" if len(evalRes.String()) > 0 { - //evalResFloat, _ := strconv.ParseFloat(evalRes.String(), 64) - //got = fmt.Sprintf("%.6g", evalResFloat) + // evalResFloat, _ := strconv.ParseFloat(evalRes.String(), 64) + // got = fmt.Sprintf("%.6g", evalResFloat) got = evalRes.ToIntString() } fmt.Printf("\t%v => %s\n", test.env, got) @@ -189,8 +189,8 @@ func TestCheck(t *testing.T) { if test.want != "" { // got := fmt.Sprintf("%.6g", expr.Eval(test.env)) evalRes, _ := expr.Eval(test.env) - //evalResFloat, _ := strconv.ParseFloat(evalRes.String(), 64) - //got := fmt.Sprintf("%.6g", evalResFloat) + // evalResFloat, _ := strconv.ParseFloat(evalRes.String(), 64) + // got := fmt.Sprintf("%.6g", evalResFloat) got := evalRes.ToIntString() if got != test.want { t.Errorf("%s: %v => %s, want %s", diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index b6dcd9089..f82ffb4fb 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -71,38 +71,6 @@ var Tracer = otel.Tracer("Runtime") var errTxClosed = errors.New("transaction is closed") -func NewAtomDB(node *config.Node) *AtomDB { - if node == nil { - return nil - } - r, w, err := node.GetReadAndWriteWeight() - if err != nil { - return nil - } - db := &AtomDB{ - id: node.Name, - weight: proto.Weight{R: int32(r), W: int32(w)}, - } - - raw, _ := json.Marshal(map[string]interface{}{ - "dsn": fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?%s", node.Username, node.Password, node.Host, node.Port, node.Database, node.Parameters.String()), - }) - connector, err := mysql.NewConnector(raw) - if err != nil { - panic(err) - } - - var ( - capacity = config.GetConnPropCapacity(node.ConnProps, 8) - maxCapacity = config.GetConnPropMaxCapacity(node.ConnProps, 64) - idleTime = config.GetConnPropIdleTime(node.ConnProps, 30*time.Minute) - ) - - db.pool = pools.NewResourcePool(connector.NewBackendConnection, capacity, maxCapacity, idleTime, 1, nil) - - return db -} - // Runtime executes a sql statement. type Runtime interface { proto.Executable @@ -131,9 +99,10 @@ func Unload(schema string) error { } var ( - _ proto.DB = (*AtomDB)(nil) - _ proto.Callable = (*atomTx)(nil) - _ proto.Tx = (*compositeTx)(nil) + _ proto.DB = (*AtomDB)(nil) + _ proto.Callable = (*atomTx)(nil) + _ proto.Tx = (*compositeTx)(nil) + _ proto.VersionSupport = (*compositeTx)(nil) ) type compositeTx struct { @@ -144,6 +113,10 @@ type compositeTx struct { txs map[string]*atomTx } +func (tx *compositeTx) Version(ctx context.Context) (string, error) { + return tx.rt.Version(ctx) +} + func (tx *compositeTx) Query(ctx context.Context, db string, query string, args ...interface{}) (proto.Result, error) { return tx.call(ctx, db, query, args...) } @@ -398,6 +371,8 @@ func (tx *atomTx) dispose() { } type AtomDB struct { + mu sync.Mutex + id string weight proto.Weight @@ -406,6 +381,53 @@ type AtomDB struct { closed atomic.Bool pendingRequests atomic.Int64 + + variables atomic.Value // map[string]string +} + +func NewAtomDB(node *config.Node) *AtomDB { + if node == nil { + return nil + } + r, w, err := node.GetReadAndWriteWeight() + if err != nil { + return nil + } + db := &AtomDB{ + id: node.Name, + weight: proto.Weight{R: int32(r), W: int32(w)}, + } + + raw, _ := json.Marshal(map[string]interface{}{ + "dsn": fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?%s", node.Username, node.Password, node.Host, node.Port, node.Database, node.Parameters.String()), + }) + connector, err := mysql.NewConnector(raw) + if err != nil { + panic(err) + } + + var ( + capacity = config.GetConnPropCapacity(node.ConnProps, 8) + maxCapacity = config.GetConnPropMaxCapacity(node.ConnProps, 64) + idleTime = config.GetConnPropIdleTime(node.ConnProps, 30*time.Minute) + ) + + db.pool = pools.NewResourcePool(connector.NewBackendConnection, capacity, maxCapacity, idleTime, 1, nil) + + // async fetch variables + go func() { + _, _ = db.fetchVariables(context.Background()) + }() + + return db +} + +func (db *AtomDB) Variable(ctx context.Context, name string) (string, error) { + variables, err := db.fetchVariables(ctx) + if err != nil { + return "", perrors.WithStack(err) + } + return variables[name], nil } func (db *AtomDB) begin(ctx context.Context) (*atomTx, error) { @@ -584,8 +606,65 @@ func (db *AtomDB) returnConnection(bc *mysql.BackendConnection) { // log.Infof("^^^^^ return conn: active=%d, available=%d", db.pool.Active(), db.pool.Available()) } +func (db *AtomDB) fetchVariables(ctx context.Context) (map[string]string, error) { + var ( + val map[string]string + ok bool + ) + if val, ok = db.variables.Load().(map[string]string); ok { + return val, nil + } + + db.mu.Lock() + defer db.mu.Unlock() + + if val, ok = db.variables.Load().(map[string]string); ok { + return val, nil + } + + res, _, err := db.Call(ctx, "SHOW VARIABLES") + if err != nil { + return nil, perrors.Wrapf(err, "cannot fetch variables") + } + + ds, err := res.Dataset() + if err != nil { + return nil, perrors.Wrapf(err, "cannot fetch variables") + } + + defer ds.Close() + + var ( + newborn = make(map[string]string) + dest = make([]proto.Value, 2) + row proto.Row + ) + + for { + row, err = ds.Next() + if perrors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, perrors.Wrapf(err, "cannot fetch variables") + } + if err = row.Scan(dest); err != nil { + return nil, perrors.Wrapf(err, "cannot fetch variables") + } + newborn[fmt.Sprint(dest[0])] = fmt.Sprint(dest[1]) + } + + db.variables.Store(newborn) + + return newborn, nil +} + type defaultRuntime namespace.Namespace +func (pi *defaultRuntime) Version(ctx context.Context) (string, error) { + return pi.Namespace().DB0(ctx).Variable(ctx, "version") +} + func (pi *defaultRuntime) Begin(ctx context.Context) (proto.Tx, error) { _, span := Tracer.Start(ctx, "defaultRuntime.Begin") defer span.End() @@ -629,7 +708,7 @@ func (pi *defaultRuntime) Execute(ctx *proto.Context) (res proto.Result, warn ui execStart := time.Now() defer func() { span.End() - var since = time.Since(execStart) + since := time.Since(execStart) metrics.ExecuteDuration.Observe(since.Seconds()) if pi.Namespace().SlowThreshold() != 0 && since > pi.Namespace().SlowThreshold() { pi.Namespace().SlowLogger().Warnf("slow logs elapsed %v sql %s", since, ctx.GetQuery()) diff --git a/pkg/trace/jaeger/jaeger.go b/pkg/trace/jaeger/jaeger.go index 6e2053408..ee740be15 100644 --- a/pkg/trace/jaeger/jaeger.go +++ b/pkg/trace/jaeger/jaeger.go @@ -41,8 +41,7 @@ const ( parentKey = "traceparent" ) -type Jaeger struct { -} +type Jaeger struct{} func init() { trace.RegisterProviders(trace.Jaeger, &Jaeger{}) diff --git a/pkg/util/match/slice_match.go b/pkg/util/match/slice_match.go index 062e6d576..28c0c0d42 100644 --- a/pkg/util/match/slice_match.go +++ b/pkg/util/match/slice_match.go @@ -26,7 +26,6 @@ import ( // Copy from // isEmpty gets whether the specified object is considered empty or not. func isEmpty(object interface{}) bool { - // get nil case out of the way if object == nil { return true diff --git a/test/integration_test.go b/test/integration_test.go index 16bee81a6..e09ef97ba 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -49,7 +49,7 @@ func TestSuite(t *testing.T) { WithMySQLDatabase("employees"), WithConfig("../integration_test/config/db_tbl/config.yaml"), WithScriptPath("../scripts"), - //WithDevMode(), // NOTICE: UNCOMMENT IF YOU WANT TO DEBUG LOCAL ARANA SERVER!!! + // WithDevMode(), // NOTICE: UNCOMMENT IF YOU WANT TO DEBUG LOCAL ARANA SERVER!!! ) suite.Run(t, &IntegrationSuite{su}) } @@ -992,3 +992,24 @@ func (s *IntegrationSuite) TestAnalyzeTable() { }) } } + +func (s *IntegrationSuite) TestCompat80() { + var ( + db = s.DB() + t = s.T() + ) + + type tt struct { + sql string + } + + for _, it := range [...]tt{ + {"select @@query_cache_size,@@query_cache_type,@@tx_isolation"}, + } { + t.Run(it.sql, func(t *testing.T) { + rows, err := db.Query(it.sql) + assert.NoError(t, err) + defer rows.Close() + }) + } +} diff --git a/testdata/mock_runtime.go b/testdata/mock_runtime.go index fd1345ea9..fd24629cd 100644 --- a/testdata/mock_runtime.go +++ b/testdata/mock_runtime.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + // Code generated by MockGen. DO NOT EDIT. // Source: github.com/arana-db/arana/pkg/proto (interfaces: VConn,Plan,Optimizer,DB) @@ -356,10 +373,25 @@ func (mr *MockDBMockRecorder) SetWeight(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWeight", reflect.TypeOf((*MockDB)(nil).SetWeight), arg0) } +// Variable mocks base method. +func (m *MockDB) Variable(arg0 context.Context, arg1 string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Variable", arg0, arg1) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Variable indicates an expected call of Variable. +func (mr *MockDBMockRecorder) Variable(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Variable", reflect.TypeOf((*MockDB)(nil).Variable), arg0, arg1) +} + // Weight mocks base method. func (m *MockDB) Weight() proto.Weight { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Ordinal") + ret := m.ctrl.Call(m, "Weight") ret0, _ := ret[0].(proto.Weight) return ret0 } @@ -367,5 +399,5 @@ func (m *MockDB) Weight() proto.Weight { // Weight indicates an expected call of Weight. func (mr *MockDBMockRecorder) Weight() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ordinal", reflect.TypeOf((*MockDB)(nil).Weight)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Weight", reflect.TypeOf((*MockDB)(nil).Weight)) }