Skip to content

Commit

Permalink
*: global index support admin check table | index (#53156)
Browse files Browse the repository at this point in the history
ref #52897, close #53019
  • Loading branch information
Defined2014 committed May 13, 2024
1 parent 41ce0a5 commit b1818cd
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 10 deletions.
6 changes: 6 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2474,6 +2474,12 @@ func (w *cleanUpIndexWorker) BackfillData(handleRange reorgBackfillTask) (taskCt
return nil
})
logSlowOperations(time.Since(oprStartTime), "cleanUpIndexBackfillDataInTxn", 3000)
failpoint.Inject("mockDMLExecution", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) && MockDMLExecution != nil {
MockDMLExecution()
}
})

return
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ func buildIndexLookUpChecker(b *executorBuilder, p *plannercore.PhysicalIndexLoo
if !e.isCommonHandle() {
fullColLen++
}
if e.index.Global {
fullColLen++
}
e.dagPB.OutputOffsets = make([]uint32, fullColLen)
for i := 0; i < fullColLen; i++ {
e.dagPB.OutputOffsets[i] = uint32(i)
Expand All @@ -470,6 +473,9 @@ func buildIndexLookUpChecker(b *executorBuilder, p *plannercore.PhysicalIndexLoo
if !e.isCommonHandle() {
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
}
if e.index.Global {
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
}

e.checkIndexValue = &checkIndexValue{idxColTps: tps}

Expand Down
7 changes: 3 additions & 4 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,17 +594,16 @@ func (e *IndexLookUpExecutor) needPartitionHandle(tp getHandleType) (bool, error
outputOffsets := e.tableRequest.OutputOffsets
col = cols[outputOffsets[len(outputOffsets)-1]]

// For TableScan, need partitionHandle in `indexOrder` when e.keepOrder == true
needPartitionHandle = (e.index.Global || e.partitionTableMode) && e.keepOrder
// For TableScan, need partitionHandle in `indexOrder` when e.keepOrder == true or execute `admin check [table|index]` with global index
needPartitionHandle = ((e.index.Global || e.partitionTableMode) && e.keepOrder) || (e.index.Global && e.checkIndexValue != nil)
// no ExtraPidColID here, because TableScan shouldn't contain them.
hasExtraCol = col.ID == model.ExtraPhysTblID
}

// TODO: fix global index related bugs later
// There will be two needPartitionHandle != hasExtraCol situations.
// Only `needPartitionHandle` == true and `hasExtraCol` == false are not allowed.
// `ExtraPhysTblID` will be used in `SelectLock` when `needPartitionHandle` == false and `hasExtraCol` == true.
if needPartitionHandle && !hasExtraCol && !e.index.Global {
if needPartitionHandle && !hasExtraCol {
return needPartitionHandle, errors.Errorf("Internal error, needPartitionHandle != ret, tp(%d)", tp)
}
return needPartitionHandle, nil
Expand Down
7 changes: 6 additions & 1 deletion pkg/executor/test/admintest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 18,
shard_count = 21,
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/util/callback",
"//pkg/domain",
"//pkg/errno",
"//pkg/executor",
Expand All @@ -22,6 +24,7 @@ go_test(
"//pkg/sessionctx/variable",
"//pkg/table",
"//pkg/table/tables",
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/testkit/testutil",
Expand All @@ -32,6 +35,8 @@ go_test(
"//pkg/util/mock",
"//pkg/util/redact",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
Expand Down
239 changes: 239 additions & 0 deletions pkg/executor/test/admintest/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain"
mysql "github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/executor"
Expand All @@ -34,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testutil"
"github.com/pingcap/tidb/pkg/types"
Expand All @@ -42,6 +46,7 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil/consistency"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tidb/pkg/util/redact"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1773,3 +1778,237 @@ func TestAdminCheckTableErrorLocateForClusterIndex(t *testing.T) {
tk.MustExec("admin check table admin_test")
}
}

func TestAdminCheckGlobalIndex(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
var enableFastCheck = []bool{false, true}
for _, enabled := range enableFastCheck {
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test")

tk.MustExec("set tidb_enable_global_index = true")
tk.MustExec(fmt.Sprintf("set tidb_enable_fast_table_check = %v", enabled))

tk.MustExec("create table admin_test (a int, b int, c int, unique key uidx_a(a)) partition by hash(c) partitions 5")
tk.MustExec("insert admin_test values (-10, -20, 1), (-1, -10, 2), (1, 11, 3), (2, 12, 0), (5, 15, -1), (10, 20, -2), (20, 30, -3)")

// Make some corrupted index. Build the index information.
sctx := mock.NewContext()
sctx.Store = store
is := domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("admin_test")
tbl, err := is.TableByName(dbName, tblName)
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
require.True(t, idxInfo.Global)
idx := tbl.Indices()[0]
require.NotNil(t, idx)

// Reduce one row of table.
// Index count > table count, (2, 12, 0) is deleted.
txn, err := store.Begin()
require.NoError(t, err)
err = txn.Delete(tablecodec.EncodeRowKey(tblInfo.GetPartitionInfo().Definitions[0].ID, kv.IntHandle(4).Encoded()))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.ErrorContains(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 4, index-values:\"handle: 4, values: [KindInt64 2")

indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[0].ID, tblInfo, idxInfo)
// Remove corresponding index key/value.
// Admin check table will success.
txn, err = store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(2)}, kv.IntHandle(4))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
tk.MustExec("admin check table admin_test")

indexOpr = tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[2].ID, tblInfo, idxInfo)

// Reduce one row of index.
// Index count < table count, (-1, -10, 2) is deleted.
txn, err = store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(-1)}, kv.IntHandle(2))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.EqualError(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 2, index-values:\"\" != record-values:\"handle: 2, values: [KindInt64 -1]\"")

// Add one row of index with inconsistent value.
// Index count = table count, but data is different.
txn, err = store.Begin()
require.NoError(t, err)
_, err = indexOpr.Create(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(100)}, kv.IntHandle(2), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
if !enabled {
require.True(t, consistency.ErrAdminCheckInconsistentWithColInfo.Equal(err))
require.EqualError(t, err, "[executor:8134]data inconsistency in table: admin_test, index: uidx_a, col: a, handle: \"2\", index-values:\"KindInt64 100\" != record-values:\"KindInt64 -1\", compare err:<nil>")
} else {
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.EqualError(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 2, index-values:\"handle: 2, values: [KindInt64 100]\" != record-values:\"handle: 2, values: [KindInt64 -1]\"")
}
}
}

func TestAdminCheckGlobalIndexWithClusterIndex(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)

getCommonHandle := func(row int) *kv.CommonHandle {
h, err := codec.EncodeKey(tk.Session().GetSessionVars().StmtCtx.TimeZone(), nil, types.MakeDatums(row)...)
require.NoError(t, err)
ch, err := kv.NewCommonHandle(h)
require.NoError(t, err)
return ch
}

var enableFastCheck = []bool{false, true}
for _, enabled := range enableFastCheck {
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test")

tk.MustExec("set tidb_enable_global_index = true")
tk.MustExec(fmt.Sprintf("set tidb_enable_fast_table_check = %v", enabled))

tk.MustExec("create table admin_test (a int, b int, c int, unique key uidx_a(a), primary key(c)) partition by hash(c) partitions 5")
tk.MustExec("insert admin_test values (-10, -20, 1), (-1, -10, 2), (1, 11, 3), (2, 12, 0), (5, 15, -1), (10, 20, -2), (20, 30, -3)")

// Make some corrupted index. Build the index information.
sctx := mock.NewContext()
sctx.Store = store
is := domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("admin_test")
tbl, err := is.TableByName(dbName, tblName)
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
require.True(t, idxInfo.Global)
df := tblInfo.GetPartitionInfo().Definitions[0]

// Reduce one row of table.
// Index count > table count, (2, 12, 0) is deleted.
txn, err := store.Begin()
require.NoError(t, err)
txn.Delete(tablecodec.EncodeRowKey(df.ID, kv.IntHandle(0).Encoded()))
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.ErrorContains(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 0, index-values:\"handle: 0, values: [KindInt64 2")

indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[0].ID, tblInfo, idxInfo)
// Remove corresponding index key/value.
// Admin check table will success.
txn, err = store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(2)}, getCommonHandle(0))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
tk.MustExec("admin check table admin_test")

indexOpr = tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[2].ID, tblInfo, idxInfo)
// Reduce one row of index.
// Index count < table count, (-1, -10, 2) is deleted.
txn, err = store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(-1)}, getCommonHandle(2))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.EqualError(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 2, index-values:\"\" != record-values:\"handle: 2, values: [KindInt64 -1]\"")

// Add one row with inconsistent value.
// Index count = table count, but data is different.
txn, err = store.Begin()
require.NoError(t, err)
_, err = indexOpr.Create(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(100)}, getCommonHandle(2), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
if !enabled {
require.True(t, consistency.ErrAdminCheckInconsistentWithColInfo.Equal(err))
require.EqualError(t, err, "[executor:8134]data inconsistency in table: admin_test, index: uidx_a, col: a, handle: \"2\", index-values:\"KindInt64 100\" != record-values:\"KindInt64 -1\", compare err:<nil>")
} else {
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.EqualError(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 2, index-values:\"handle: 2, values: [KindInt64 100]\" != record-values:\"handle: 2, values: [KindInt64 -1]\"")
}
}
}

func TestAdminCheckGlobalIndexDuringDDL(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
originalHook := dom.DDL().GetHook()
tk := testkit.NewTestKit(t, store)

var schemaMap = make(map[model.SchemaState]struct{})

hook := &callback.TestDDLCallback{Do: dom}
onJobUpdatedExportedFunc := func(job *model.Job) {
schemaMap[job.SchemaState] = struct{}{}
_, err := tk.Exec("admin check table admin_test")
assert.NoError(t, err)
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)

// check table after delete some index key/value pairs.
ddl.MockDMLExecution = func() {
_, err := tk.Exec("admin check table admin_test")
assert.NoError(t, err)
}

batchSize := 32
tk.MustExec(fmt.Sprintf("set global tidb_ddl_reorg_batch_size = %d", batchSize))

var enableFastCheck = []bool{false, true}
for _, enabled := range enableFastCheck {
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test")

tk.MustExec("set tidb_enable_global_index = true")
tk.MustExec(fmt.Sprintf("set tidb_enable_fast_table_check = %v", enabled))

tk.MustExec("create table admin_test (a int, b int, c int, unique key uidx_a(a), primary key(c)) partition by hash(c) partitions 5")
tk.MustExec("insert admin_test values (-10, -20, 1), (-1, -10, 2), (1, 11, 3), (2, 12, 0), (5, 15, -1), (10, 20, -2), (20, 30, -3)")
for i := 1; i <= batchSize*2; i++ {
tk.MustExec(fmt.Sprintf("insert admin_test values (%d, %d, %d)", i*5+1, i, i*5+1))
}

dom.DDL().SetHook(hook)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecution", "1*return(true)->return(false)"))
tk.MustExec("alter table admin_test truncate partition p1")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecution"))
dom.DDL().SetHook(originalHook)

// Should have 3 different schema states, `none`, `deleteOnly`, `deleteReorg`
require.Len(t, schemaMap, 3)
for ss := range schemaMap {
delete(schemaMap, ss)
}
}
}
13 changes: 8 additions & 5 deletions pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,7 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (base.P

func (b *PlanBuilder) buildPhysicalIndexLookUpReader(_ context.Context, dbName model.CIStr, tbl table.Table, idx *model.IndexInfo) (base.Plan, error) {
tblInfo := tbl.Meta()
physicalID, isPartition := getPhysicalID(tbl)
physicalID, isPartition := getPhysicalID(tbl, idx.Global)
fullExprCols, _, err := expression.TableInfo2SchemaAndNames(b.ctx.GetExprCtx(), dbName, tblInfo)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1531,6 +1531,9 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReader(_ context.Context, dbName m
}
}
}
if is.Index.Global {
ts.Columns, ts.schema, _ = AddExtraPhysTblIDColumn(b.ctx, ts.Columns, ts.schema)
}

cop := &CopTask{
indexPlan: is,
Expand Down Expand Up @@ -1567,9 +1570,9 @@ func getIndexColsSchema(tblInfo *model.TableInfo, idx *model.IndexInfo, allColSc
return schema
}

func getPhysicalID(t table.Table) (physicalID int64, isPartition bool) {
func getPhysicalID(t table.Table, isGlobalIndex bool) (physicalID int64, isPartition bool) {
tblInfo := t.Meta()
if tblInfo.GetPartitionInfo() != nil {
if !isGlobalIndex && tblInfo.GetPartitionInfo() != nil {
pid := t.(table.PhysicalTable).GetPhysicalID()
return pid, true
}
Expand Down Expand Up @@ -1657,8 +1660,8 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam
}
}
indexInfos = append(indexInfos, idxInfo)
// For partition tables.
if pi := tbl.Meta().GetPartitionInfo(); pi != nil {
// For partition tables except global index.
if pi := tbl.Meta().GetPartitionInfo(); pi != nil && !idxInfo.Global {
for _, def := range pi.Definitions {
t := tbl.(table.PartitionedTable).GetPartition(def.ID)
reader, err := b.buildPhysicalIndexLookUpReader(ctx, dbName, t, idxInfo)
Expand Down

0 comments on commit b1818cd

Please sign in to comment.