Skip to content

Commit

Permalink
util/admin: support admin check table on partition table (pingcap#12796)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed Nov 5, 2019
1 parent 4c7bfb8 commit 2539164
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 53 deletions.
103 changes: 103 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,109 @@ func (s *testSuite) TestAdminCheckTableFailed(c *C) {
tk.MustExec("admin check table admin_test")
}

func (s *testSuite) TestAdminCheckPartitionTableFailed(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test_p")
tk.MustExec("set @@tidb_enable_table_partition = 1")
tk.MustExec("create table admin_test_p (c1 int key,c2 int,c3 int,index idx(c2)) partition by range (c1) (" +
"partition p0 values less than (1)," +
"partition p1 values less than (2)," +
"partition p2 values less than (3)," +
"partition p3 values less than (4)," +
"partition p4 values less than (5)," +
"partition p5 values less than (maxvalue))")
tk.MustExec("insert admin_test_p (c1, c2, c3) values (0,0,0), (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustExec("admin check table admin_test_p")

// Make some corrupted index. Build the index information.
s.ctx = mock.NewContext()
s.ctx.Store = s.store
is := s.domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("admin_test_p")
tbl, err := is.TableByName(dbName, tblName)
c.Assert(tbl, NotNil)
c.Assert(err, IsNil)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
sc := s.ctx.GetSessionVars().StmtCtx
tk.Se.GetSessionVars().IndexLookupSize = 3
tk.Se.GetSessionVars().MaxChunkSize = 3

// Reduce one row of index on partitions.
// Table count > index count.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err.Error(), Equals, fmt.Sprintf("[executor:8003]admin_test_p err:[admin:1]index:<nil> != record:&admin.RecordData{Handle:%d, Values:[]types.Datum{types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)}}}", i, i))
c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue)
// TODO: fix admin recover for partition table.
//r := tk.MustQuery("admin recover index admin_test_p idx")
//r.Check(testkit.Rows("0 0"))
//tk.MustExec("admin check table admin_test_p")
// Manual recover index.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}

// Add one row of index on partitions.
// Table count < index count.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i+8), int64(i+8))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, fmt.Sprintf("handle %d, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)} != record:<nil>", i+8, i+8))
// TODO: fix admin recover for partition table.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i+8), int64(i+8))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}

// Table count = index count, but the index value was wrong.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i+8), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, fmt.Sprintf("col c2, handle %d, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)}", i, i+8, i))
// TODO: fix admin recover for partition table.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i+8), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}
}

func (s *testSuite) TestAdminCheckTable(c *C) {
// test NULL value.
tk := testkit.NewTestKit(c, s.store)
Expand Down
15 changes: 10 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,12 @@ func (b *executorBuilder) buildCheckTable(v *plannercore.CheckTable) Executor {
e := &CheckTableExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dbName: v.DBName,
tblInfo: v.TblInfo,
indices: v.Indices,
table: v.Table,
indexInfos: v.IndexInfos,
is: b.is,
srcs: readerExecs,
exitCh: make(chan struct{}),
retCh: make(chan error, len(v.Indices)),
retCh: make(chan error, len(readerExecs)),
}
return e
}
Expand Down Expand Up @@ -1769,12 +1769,17 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return nil, errors.Trace(err)
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
table, _ := b.is.TableByID(ts.Table.ID)
tbl, _ := b.is.TableByID(ts.Table.ID)
isPartition, physicalTableID := ts.IsPartition()
if isPartition {
pt := tbl.(table.PartitionedTable)
tbl = pt.GetPartition(physicalTableID)
}
e := &TableReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dagPB: dagReq,
physicalTableID: ts.Table.ID,
table: table,
table: tbl,
keepOrder: ts.KeepOrder,
desc: ts.Desc,
columns: ts.Columns,
Expand Down
64 changes: 40 additions & 24 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -431,14 +432,14 @@ func getTableName(is infoschema.InfoSchema, id int64) string {
type CheckTableExec struct {
baseExecutor

dbName string
tblInfo *model.TableInfo
indices []table.Index
srcs []*IndexLookUpExecutor
done bool
is infoschema.InfoSchema
exitCh chan struct{}
retCh chan error
dbName string
table table.Table
indexInfos []*model.IndexInfo
srcs []*IndexLookUpExecutor
done bool
is infoschema.InfoSchema
exitCh chan struct{}
retCh chan error
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -466,7 +467,20 @@ func (e *CheckTableExec) Close() error {
return firstErr
}

func (e *CheckTableExec) checkIndexHandle(ctx context.Context, num int, src *IndexLookUpExecutor) error {
func (e *CheckTableExec) checkTableIndexHandle(ctx context.Context, idxInfo *model.IndexInfo) error {
// For partition table, there will be multi same index indexLookUpReaders on different partitions.
for _, src := range e.srcs {
if src.index.Name.L == idxInfo.Name.L {
err := e.checkIndexHandle(ctx, src)
if err != nil {
return err
}
}
}
return nil
}

func (e *CheckTableExec) checkIndexHandle(ctx context.Context, src *IndexLookUpExecutor) error {
cols := src.schema.Columns
retFieldTypes := make([]*types.FieldType, len(cols))
for i := range cols {
Expand Down Expand Up @@ -507,20 +521,19 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
defer func() { e.done = true }()

idxNames := make([]string, 0, len(e.indices))
for _, idx := range e.indices {
idxNames = append(idxNames, idx.Meta().Name.O)
idxNames := make([]string, 0, len(e.indexInfos))
for _, idx := range e.indexInfos {
idxNames = append(idxNames, idx.Name.O)
}
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tblInfo.Name.O, idxNames)
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.table.Meta().Name.O, idxNames)
if err != nil {
tbl := e.srcs[idxOffset].table
if greater == admin.IdxCntGreater {
err = e.checkIndexHandle(ctx, idxOffset, e.srcs[idxOffset])
err = e.checkTableIndexHandle(ctx, e.indexInfos[idxOffset])
} else if greater == admin.TblCntGreater {
err = e.checkTableRecord(tbl, idxOffset)
err = e.checkTableRecord(idxOffset)
}
if err != nil && admin.ErrDataInConsistent.Equal(err) {
return ErrAdminCheckTable.GenWithStack("%v err:%v", tbl.Meta().Name, err)
return ErrAdminCheckTable.GenWithStack("%v err:%v", e.table.Meta().Name, err)
}
return errors.Trace(err)
}
Expand All @@ -534,7 +547,7 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
go func(num int) {
defer wg.Done()
util.WithRecovery(func() {
err1 := e.checkIndexHandle(ctx, num, e.srcs[num])
err1 := e.checkIndexHandle(ctx, e.srcs[num])
if err1 != nil {
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err))
}
Expand All @@ -555,21 +568,24 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *CheckTableExec) checkTableRecord(tbl table.Table, idxOffset int) error {
idx := e.indices[idxOffset]
func (e *CheckTableExec) checkTableRecord(idxOffset int) error {
idxInfo := e.indexInfos[idxOffset]
// TODO: Fix me later, can not use genExprs in indexLookUpReader, because the schema of expression is different.
genExprs := e.srcs[idxOffset].genExprs
txn, err := e.ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
if tbl.Meta().GetPartitionInfo() == nil {
return admin.CheckRecordAndIndex(e.ctx, txn, tbl, idx, genExprs)
if e.table.Meta().GetPartitionInfo() == nil {
idx := tables.NewIndex(e.table.Meta().ID, e.table.Meta(), idxInfo)
return admin.CheckRecordAndIndex(e.ctx, txn, e.table, idx, genExprs)
}

info := tbl.Meta().GetPartitionInfo()
info := e.table.Meta().GetPartitionInfo()
for _, def := range info.Definitions {
pid := def.ID
partition := tbl.(table.PartitionedTable).GetPartition(pid)
partition := e.table.(table.PartitionedTable).GetPartition(pid)
idx := tables.NewIndex(def.ID, e.table.Meta(), idxInfo)
if err := admin.CheckRecordAndIndex(e.ctx, txn, partition, idx, genExprs); err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ type CheckTable struct {
baseSchemaProducer

DBName string
TblInfo *model.TableInfo
Indices []table.Index
Table table.Table
IndexInfos []*model.IndexInfo
IndexLookUpReaders []*PhysicalIndexLookUpReader
}

Expand Down
2 changes: 1 addition & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexIn
cop.tablePlan = ts
}

is.initSchema(ds.id, idx, cop.tablePlan != nil)
is.initSchema(idx, cop.tablePlan != nil)
indexConds, tblConds := splitIndexFilterConditions(remainedConds, idx.Columns, ds.tableInfo)
path := &accessPath{
indexFilters: indexConds,
Expand Down
4 changes: 2 additions & 2 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid
// If it's parent requires double read task, return max cost.
return invalidTask, nil
}
is.initSchema(ds.id, idx, cop.tablePlan != nil)
is.initSchema(idx, cop.tablePlan != nil)
// Only use expectedCnt when it's smaller than the count we calculated.
// e.g. IndexScan(count1)->After Filter(count2). The `ds.stats.RowCount` is count2. count1 is the one we need to calculate
// If expectedCnt and count2 are both zero and we go into the below `if` block, the count1 will be set to zero though it's shouldn't be.
Expand Down Expand Up @@ -542,7 +542,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid
}

// TODO: refactor this part, we should not call Clone in fact.
func (is *PhysicalIndexScan) initSchema(id int, idx *model.IndexInfo, isDoubleRead bool) {
func (is *PhysicalIndexScan) initSchema( idx *model.IndexInfo, isDoubleRead bool) {
indexCols := make([]*expression.Column, 0, len(idx.Columns))
for _, col := range idx.Columns {
colFound := is.dataSourceSchema.FindColumnByName(col.Name.L)
Expand Down

0 comments on commit 2539164

Please sign in to comment.