From e037f9aeaf8e24a46e0a0a6706ce246904cd81a0 Mon Sep 17 00:00:00 2001 From: DemanWei <115411915+DemanWei@users.noreply.github.com> Date: Tue, 31 Jan 2023 19:54:08 +0800 Subject: [PATCH] [SQL] Support CHECK TABLE Statement #386 (#608) * [SQL] Support CHECK TABLE Statement #386 * fix problems --- go.mod | 2 +- go.sum | 4 +- pkg/executor/redirect.go | 3 +- pkg/runtime/ast/ast.go | 12 ++ pkg/runtime/ast/check_table.go | 57 ++++++++ pkg/runtime/ast/proto.go | 2 + pkg/runtime/optimize/ddl/check_table.go | 48 +++++++ pkg/runtime/plan/ddl/check_table.go | 174 ++++++++++++++++++++++++ test/integration_test.go | 24 ++++ 9 files changed, 322 insertions(+), 4 deletions(-) create mode 100644 pkg/runtime/ast/check_table.go create mode 100644 pkg/runtime/optimize/ddl/check_table.go create mode 100644 pkg/runtime/plan/ddl/check_table.go diff --git a/go.mod b/go.mod index bad77020..4c8eb028 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/appleboy/gin-jwt/v2 v2.9.1 - github.com/arana-db/parser v0.2.9 + github.com/arana-db/parser v0.2.11 github.com/blang/semver v3.5.1+incompatible github.com/bwmarrin/snowflake v0.3.0 github.com/cespare/xxhash/v2 v2.1.2 diff --git a/go.sum b/go.sum index 2afbffd2..aaf359f1 100644 --- a/go.sum +++ b/go.sum @@ -98,8 +98,8 @@ github.com/appleboy/gin-jwt/v2 v2.9.1 h1:l29et8iLW6omcHltsOP6LLk4s3v4g2FbFs0koxG github.com/appleboy/gin-jwt/v2 v2.9.1/go.mod h1:jwcPZJ92uoC9nOUTOKWoN/f6JZOgMSKlFSHw5/FrRUk= github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4= github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw= -github.com/arana-db/parser v0.2.9 h1:Huzd4MybM7bVQUT08TSvzdOxxTExCauSwG65xAxWwrs= -github.com/arana-db/parser v0.2.9/go.mod h1:/XA29bplweWSEAjgoM557ZCzhBilSawUlHcZFjOeDAc= +github.com/arana-db/parser v0.2.11 h1:w1J9hf+5XAFm0+lbggqJRWMaVQ9iOn2Fg6XD+l0wSQ0= +github.com/arana-db/parser v0.2.11/go.mod h1:/XA29bplweWSEAjgoM557ZCzhBilSawUlHcZFjOeDAc= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= diff --git a/pkg/executor/redirect.go b/pkg/executor/redirect.go index a81e3b1a..064296b7 100644 --- a/pkg/executor/redirect.go +++ b/pkg/executor/redirect.go @@ -258,7 +258,8 @@ func (executor *RedirectExecutor) doExecutorComQuery(ctx *proto.Context, act ast } else { err = errNoDatabaseSelected } - case *ast.TruncateTableStmt, *ast.DropTableStmt, *ast.ExplainStmt, *ast.DropIndexStmt, *ast.CreateIndexStmt, *ast.AnalyzeTableStmt, *ast.OptimizeTableStmt: + case *ast.TruncateTableStmt, *ast.DropTableStmt, *ast.ExplainStmt, *ast.DropIndexStmt, *ast.CreateIndexStmt, + *ast.AnalyzeTableStmt, *ast.OptimizeTableStmt, *ast.CheckTableStmt: res, warn, err = executeStmt(ctx, schemaless, rt) case *ast.DropTriggerStmt, *ast.SetStmt, *ast.KillStmt: res, warn, err = rt.Execute(ctx) diff --git a/pkg/runtime/ast/ast.go b/pkg/runtime/ast/ast.go index 599ea656..541e5adc 100644 --- a/pkg/runtime/ast/ast.go +++ b/pkg/runtime/ast/ast.go @@ -119,6 +119,8 @@ func FromStmtNode(node ast.StmtNode) (Statement, error) { return cc.convAnalyzeTable(stmt), nil case *ast.OptimizeTableStmt: return cc.convOptimizeTable(stmt), nil + case *ast.CheckTableStmt: + return cc.convCheckTableStmt(stmt), nil case *ast.KillStmt: return cc.convKill(stmt), nil default: @@ -1637,6 +1639,16 @@ func isColumnAtom(expr PredicateNode) bool { return false } +func (cc *convCtx) convCheckTableStmt(stmt *ast.CheckTableStmt) Statement { + tables := make([]*TableName, len(stmt.Tables)) + for i, table := range stmt.Tables { + tables[i] = &TableName{ + table.Name.String(), + } + } + return &CheckTableStmt{Tables: tables} +} + func (cc *convCtx) convAnalyzeTable(stmt *ast.AnalyzeTableStmt) Statement { tables := make([]*TableName, len(stmt.TableNames)) for i, table := range stmt.TableNames { diff --git a/pkg/runtime/ast/check_table.go b/pkg/runtime/ast/check_table.go new file mode 100644 index 00000000..62a6bf5f --- /dev/null +++ b/pkg/runtime/ast/check_table.go @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ast + +import ( + "strings" +) + +import ( + "github.com/pkg/errors" +) + +var _ Statement = (*CheckTableStmt)(nil) + +type CheckTableStmt struct { + Tables []*TableName +} + +func NewCheckTableStmt() *CheckTableStmt { + return &CheckTableStmt{} +} + +func (c *CheckTableStmt) CntParams() int { + return 1 +} + +func (c *CheckTableStmt) Restore(flag RestoreFlag, sb *strings.Builder, args *[]int) error { + sb.WriteString("CHECK TABLE ") + for index, table := range c.Tables { + if index != 0 { + sb.WriteString(", ") + } + if err := table.Restore(flag, sb, args); err != nil { + return errors.Wrapf(err, "an error occurred while restore AnalyzeTableStatement.Tables[%d]", index) + } + } + return nil +} + +func (c *CheckTableStmt) Mode() SQLType { + return SQLTypeCheckTable +} diff --git a/pkg/runtime/ast/proto.go b/pkg/runtime/ast/proto.go index ed371fd2..5a7e2fb2 100644 --- a/pkg/runtime/ast/proto.go +++ b/pkg/runtime/ast/proto.go @@ -58,6 +58,7 @@ const ( SQLTypeShowProcessList // SHOW PROCESSLIST SQLTypeShowReplicaStatus // SHOW REPLICA STATUS SQLTypeKill // KILL + SQLTypeCheckTable // CHECK TABLE ) var _sqlTypeNames = [...]string{ @@ -91,6 +92,7 @@ var _sqlTypeNames = [...]string{ SQLTypeShowProcessList: "SHOW PROCESSLIST", SQLTypeShowReplicaStatus: "SHOW REPLICA STATUS", SQLTypeKill: "KILL", + SQLTypeCheckTable: "CHECK TABLE", } // SQLType represents the type of SQL. diff --git a/pkg/runtime/optimize/ddl/check_table.go b/pkg/runtime/optimize/ddl/check_table.go new file mode 100644 index 00000000..b07db450 --- /dev/null +++ b/pkg/runtime/optimize/ddl/check_table.go @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ddl + +import ( + "context" + "github.com/arana-db/arana/pkg/runtime/plan/ddl" +) + +import ( + "github.com/arana-db/arana/pkg/proto" + "github.com/arana-db/arana/pkg/proto/rule" + "github.com/arana-db/arana/pkg/runtime/ast" + "github.com/arana-db/arana/pkg/runtime/optimize" +) + +func init() { + optimize.Register(ast.SQLTypeCheckTable, optimizeCheckTable) +} + +func optimizeCheckTable(ctx context.Context, o *optimize.Optimizer) (proto.Plan, error) { + shards := rule.DatabaseTables{} + shardsByName := make(map[string]rule.DatabaseTables) + + for _, table := range o.Rule.VTables() { + shards = table.Topology().Enumerate() + shardsByName[table.Name()] = shards + break + } + + stmt := o.Stmt.(*ast.CheckTableStmt) + return ddl.NewCheckTablePlan(stmt, shards, shardsByName), nil +} diff --git a/pkg/runtime/plan/ddl/check_table.go b/pkg/runtime/plan/ddl/check_table.go new file mode 100644 index 00000000..3176fe08 --- /dev/null +++ b/pkg/runtime/plan/ddl/check_table.go @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ddl + +import ( + "context" + "strings" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/dataset" + "github.com/arana-db/arana/pkg/mysql/rows" + "github.com/arana-db/arana/pkg/proto" + "github.com/arana-db/arana/pkg/proto/rule" + "github.com/arana-db/arana/pkg/resultx" + "github.com/arana-db/arana/pkg/runtime/ast" + "github.com/arana-db/arana/pkg/runtime/plan" +) + +type CheckTablePlan struct { + plan.BasePlan + Stmt *ast.CheckTableStmt + Shards rule.DatabaseTables + ShardsByName map[string]rule.DatabaseTables +} + +func NewCheckTablePlan( + stmt *ast.CheckTableStmt, + shards rule.DatabaseTables, + shardsByName map[string]rule.DatabaseTables, +) *CheckTablePlan { + return &CheckTablePlan{ + Stmt: stmt, + Shards: shards, + ShardsByName: shardsByName, + } +} + +// Type get plan type +func (c *CheckTablePlan) Type() proto.PlanType { + return proto.PlanTypeQuery +} + +func (c *CheckTablePlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) { + var ( + sb strings.Builder + args []int + ) + + ctx, span := plan.Tracer.Start(ctx, "CheckTable.ExecIn") + defer span.End() + + // currently, only implemented db0 + db, tb := c.Shards.Smallest() + if db == "" { + return nil, errors.New("no found db") + } + + // deal partition table + c.tableReplace(tb) + + if err := c.Stmt.Restore(ast.RestoreDefault, &sb, &args); err != nil { + return nil, errors.Wrap(err, "failed to execute CHECK TABLE statement") + } + + ret, err := conn.Query(ctx, db, sb.String(), c.ToArgs(args)...) + if err != nil { + return nil, err + } + + ds, err := ret.Dataset() + if err != nil { + return nil, errors.WithStack(err) + } + + fields, err := ds.Fields() + if err != nil { + return nil, errors.WithStack(err) + } + + ds = dataset.Pipe(ds, dataset.Map(nil, func(next proto.Row) (proto.Row, error) { + dest := make([]proto.Value, len(fields)) + if next.Scan(dest) != nil { + return next, nil + } + + // format physical database name to logic database name + if strings.Contains(dest[0].String(), ".") { + dbTable := strings.Split(dest[0].String(), ".") + dbName := dbTable[0] + dbNameIndex := strings.LastIndex(dbTable[0], "_") + if dbNameIndex > 0 { + dbName = dbName[:dbNameIndex] + } + + tbName := dbTable[1] + tbNameIndex := strings.LastIndex(dbTable[1], "_") + if tbNameIndex > 0 { + tbName = tbName[:tbNameIndex] + } + + dest[0] = proto.NewValueString(dbName + "." + tbName) + } + + if next.IsBinary() { + return rows.NewBinaryVirtualRow(fields, dest), nil + } + + return rows.NewTextVirtualRow(fields, dest), nil + })) + + return resultx.New(resultx.WithDataset(ds)), nil +} + +// tableReplace tb physical table name +func (c *CheckTablePlan) tableReplace(tb string) { + if tb == "" { + return + } + + // logical to physical table map + tableMap := c.physicalToLogicTable(tb) + logicTb := tableMap[tb] + + stmt := ast.NewCheckTableStmt() + + for _, table := range c.Stmt.Tables { + if strings.Trim(table.String(), "`") == logicTb { + stmt.Tables = append(stmt.Tables, &ast.TableName{tb}) + } else { + stmt.Tables = append(stmt.Tables, table) + } + } + + c.Stmt = stmt +} + +// physicalToLogicTable logical to physical table map +func (c *CheckTablePlan) physicalToLogicTable(tbName string) map[string]string { + res := make(map[string]string) + +L1: + for logicTableName, shards := range c.ShardsByName { + for _, tbs := range shards { + for _, tb := range tbs { + if tb == tbName { + res[tb] = logicTableName + continue L1 + } + } + } + } + + return res +} diff --git a/test/integration_test.go b/test/integration_test.go index 112c5bbd..372a8054 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1017,6 +1017,30 @@ func (s *IntegrationSuite) TestAnalyzeTable() { } } +// TestCheckTable +func (s *IntegrationSuite) TestCheckTable() { + var ( + db = s.DB() + t = s.T() + ) + + type tt struct { + sql string + } + + for _, it := range [...]tt{ + {"CHECK TABLE student"}, + {"CHECK TABLE student,departments"}, + {"CHECK TABLE student QUICK"}, + } { + t.Run(it.sql, func(t *testing.T) { + rows, err := db.Query(it.sql) + assert.NoError(t, err) + defer rows.Close() + }) + } +} + // TestOptimizeTable func (s *IntegrationSuite) TestOptimizeTable() { var (