Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: global index support index_merge and mem_index_merge #52971

Merged
merged 13 commits into from
May 7, 2024
16 changes: 16 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,7 @@ func (b *executorBuilder) buildUnionScanFromReader(reader exec.Executor, v *plan
}
}
}
us.partitionIDMap = x.partitionIDMap
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
Expand Down Expand Up @@ -3846,6 +3847,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
isCorColInPartialFilters := make([]bool, 0, partialPlanCount)
isCorColInPartialAccess := make([]bool, 0, partialPlanCount)
hasGlobalIndex := false
for i := 0; i < partialPlanCount; i++ {
var tempReq *tipb.DAGRequest
var err error
Expand All @@ -3854,6 +3856,9 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
tempReq, err = buildIndexReq(b.ctx, is.Index.Columns, ts.HandleCols.NumCols(), v.PartialPlans[i])
descs = append(descs, is.Desc)
indexes = append(indexes, is.Index)
if is.Index.Global {
hasGlobalIndex = true
}
} else {
ts := v.PartialPlans[i][0].(*plannercore.PhysicalTableScan)
tempReq, _, err = buildTableReq(b, len(ts.Columns), v.PartialPlans[i])
Expand Down Expand Up @@ -3911,6 +3916,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
byItems: v.ByItems,
pushedLimit: v.PushedLimit,
keepOrder: v.KeepOrder,
hasGlobalIndex: hasGlobalIndex,
}
collectTable := false
e.tableRequest.CollectRangeCounts = &collectTable
Expand Down Expand Up @@ -3952,10 +3958,14 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg
}
ret.ranges = make([][]*ranger.Range, 0, len(v.PartialPlans))
sctx := b.ctx.GetSessionVars().StmtCtx
hasGlobalIndex := false
for i := 0; i < len(v.PartialPlans); i++ {
if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok {
ret.ranges = append(ret.ranges, is.Ranges)
sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)
if is.Index.Global {
hasGlobalIndex = true
}
} else {
ret.ranges = append(ret.ranges, v.PartialPlans[i][0].(*plannercore.PhysicalTableScan).Ranges)
if ret.table.Meta().IsCommonHandle {
Expand All @@ -3982,6 +3992,12 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg
return nil
}
ret.partitionTableMode, ret.prunedPartitions = true, partitions
if hasGlobalIndex {
ret.partitionIDMap = make(map[int64]struct{})
for _, p := range partitions {
ret.partitionIDMap[p.GetPhysicalID()] = struct{}{}
}
}
return ret
}

Expand Down
63 changes: 42 additions & 21 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type IndexMergeReaderExecutor struct {

// columns are only required by union scan.
columns []*model.ColumnInfo
// partitionIDMap are only required by union scan with global index.
partitionIDMap map[int64]struct{}
*dataReaderBuilder

// fields about accessing partition tables
Expand All @@ -127,9 +129,6 @@ type IndexMergeReaderExecutor struct {
memTracker *memory.Tracker
paging bool

// checkIndexValue is used to check the consistency of the index data.
*checkIndexValue // nolint:unused

partialPlans [][]base.PhysicalPlan
tblPlans []base.PhysicalPlan
partialNetDataSizes []float64
Expand All @@ -146,6 +145,8 @@ type IndexMergeReaderExecutor struct {

// Whether it's intersection or union.
isIntersection bool

hasGlobalIndex bool
}

type indexMergeTableTask struct {
Expand Down Expand Up @@ -376,7 +377,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
return
}
}

if is.Index.Global {
keyRange, _ := distsql.IndexRangesToKVRanges(e.Ctx().GetDistSQLCtx(), is.Table.ID, is.Index.ID, e.ranges[workID])
keyRanges = [][]kv.KeyRange{keyRange.FirstPartitionRange()}
}
var builder distsql.RequestBuilder
builder.SetDAGRequest(e.dagPBs[workID]).
SetStartTS(e.startTS).
Expand Down Expand Up @@ -1191,7 +1195,7 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
if w.indexMerge.pushedLimit != nil {
pushedLimit = w.indexMerge.pushedLimit.Clone()
}
distinctHandles := make(map[int64]*kv.HandleMap)
hMap := kv.NewHandleMap()
for {
var ok bool
var task *indexMergeTableTask
Expand Down Expand Up @@ -1223,19 +1227,12 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
fhs := make([]kv.Handle, 0, 8)

memTracker.Consume(int64(cap(task.handles) * 8))

var tblID int64
if w.indexMerge.partitionTableMode {
tblID = getPhysicalTableID(task.partitionTable)
} else {
tblID = getPhysicalTableID(w.indexMerge.table)
}
if _, ok := distinctHandles[tblID]; !ok {
distinctHandles[tblID] = kv.NewHandleMap()
}
hMap := distinctHandles[tblID]

for _, h := range handles {
if w.indexMerge.partitionTableMode {
if _, ok := h.(kv.PartitionHandle); !ok {
h = kv.NewPartitionHandle(task.partitionTable.GetPhysicalID(), h)
}
}
if _, ok := hMap.Get(h); !ok {
fhs = append(fhs, h)
hMap.Set(h, true)
Expand Down Expand Up @@ -1359,6 +1356,8 @@ type intersectionProcessWorker struct {
// When rowDelta == memConsumeBatchSize, Consume(memUsage)
rowDelta int64
mapUsageDelta int64

partitionIDMap map[int64]int
}

func (w *intersectionProcessWorker) consumeMemDelta() {
Expand All @@ -1380,9 +1379,22 @@ func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Conte
hMap = kv.NewMemAwareHandleMap[*int]()
w.handleMapsPerWorker[task.parTblIdx] = hMap
}
var mapDelta int64
var rowDelta int64
var mapDelta, rowDelta int64
for _, h := range task.handles {
if w.indexMerge.hasGlobalIndex {
if ph, ok := h.(kv.PartitionHandle); ok {
offset := -1
if v, exists := w.partitionIDMap[ph.PartitionID]; exists {
offset = v
}
if hMap, ok = w.handleMapsPerWorker[offset]; !ok {
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
hMap = kv.NewMemAwareHandleMap[*int]()
w.handleMapsPerWorker[offset] = hMap
}
} else {
h = kv.NewPartitionHandle(task.partitionTable.GetPhysicalID(), h)
}
}
// Use *int to avoid Get() again.
if cntPtr, ok := hMap.Get(h); ok {
(*cntPtr)++
Expand Down Expand Up @@ -1525,7 +1537,8 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet
batchSize := w.indexMerge.Ctx().GetSessionVars().IndexLookupSize

partCnt := 1
if w.indexMerge.partitionTableMode {
// To avoid multi-threaded access the handle map, we only use one worker for indexMerge with global index.
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
if w.indexMerge.partitionTableMode && !w.indexMerge.hasGlobalIndex {
partCnt = len(w.indexMerge.prunedPartitions)
}
workerCnt := min(partCnt, maxWorkerCnt)
Expand All @@ -1536,6 +1549,13 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet
}
})

partitionIDMap := make(map[int64]int)
if w.indexMerge.hasGlobalIndex {
for i, p := range w.indexMerge.prunedPartitions {
partitionIDMap[p.GetPhysicalID()] = i
}
}

workers := make([]*intersectionProcessWorker, 0, workerCnt)
var collectWorker *intersectionCollectWorker
wg := util.WaitGroupWrapper{}
Expand Down Expand Up @@ -1566,6 +1586,7 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet
indexMerge: w.indexMerge,
memTracker: tracker,
batchSize: batchSize,
partitionIDMap: partitionIDMap,
}
wg.RunWithRecover(func() {
defer trace.StartRegion(ctx, "IndexMergeIntersectionProcessWorker").End()
Expand Down Expand Up @@ -1692,7 +1713,7 @@ func (w *partialIndexWorker) needPartitionHandle() (bool, error) {
if needPartitionHandle && !hasExtraCol {
return needPartitionHandle, errors.Errorf("Internal error, needPartitionHandle != ret")
}
return needPartitionHandle, nil
return needPartitionHandle || (col.ID == model.ExtraPidColID), nil
}

func (w *partialIndexWorker) fetchHandles(
Expand Down