Skip to content

Commit

Permalink
*: global index support index_merge and mem_index_merge (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 authored and 3AceShowHand committed May 7, 2024
1 parent 8075526 commit 707258f
Show file tree
Hide file tree
Showing 10 changed files with 538 additions and 175 deletions.
16 changes: 16 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,7 @@ func (b *executorBuilder) buildUnionScanFromReader(reader exec.Executor, v *plan
}
}
}
us.partitionIDMap = x.partitionIDMap
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
Expand Down Expand Up @@ -3848,6 +3849,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
isCorColInPartialFilters := make([]bool, 0, partialPlanCount)
isCorColInPartialAccess := make([]bool, 0, partialPlanCount)
hasGlobalIndex := false
for i := 0; i < partialPlanCount; i++ {
var tempReq *tipb.DAGRequest
var err error
Expand All @@ -3856,6 +3858,9 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
tempReq, err = buildIndexReq(b.ctx, is.Index.Columns, ts.HandleCols.NumCols(), v.PartialPlans[i])
descs = append(descs, is.Desc)
indexes = append(indexes, is.Index)
if is.Index.Global {
hasGlobalIndex = true
}
} else {
ts := v.PartialPlans[i][0].(*plannercore.PhysicalTableScan)
tempReq, _, err = buildTableReq(b, len(ts.Columns), v.PartialPlans[i])
Expand Down Expand Up @@ -3913,6 +3918,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
byItems: v.ByItems,
pushedLimit: v.PushedLimit,
keepOrder: v.KeepOrder,
hasGlobalIndex: hasGlobalIndex,
}
collectTable := false
e.tableRequest.CollectRangeCounts = &collectTable
Expand Down Expand Up @@ -3954,10 +3960,14 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg
}
ret.ranges = make([][]*ranger.Range, 0, len(v.PartialPlans))
sctx := b.ctx.GetSessionVars().StmtCtx
hasGlobalIndex := false
for i := 0; i < len(v.PartialPlans); i++ {
if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok {
ret.ranges = append(ret.ranges, is.Ranges)
sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)
if is.Index.Global {
hasGlobalIndex = true
}
} else {
ret.ranges = append(ret.ranges, v.PartialPlans[i][0].(*plannercore.PhysicalTableScan).Ranges)
if ret.table.Meta().IsCommonHandle {
Expand All @@ -3984,6 +3994,12 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg
return nil
}
ret.partitionTableMode, ret.prunedPartitions = true, partitions
if hasGlobalIndex {
ret.partitionIDMap = make(map[int64]struct{})
for _, p := range partitions {
ret.partitionIDMap[p.GetPhysicalID()] = struct{}{}
}
}
return ret
}

Expand Down
79 changes: 51 additions & 28 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ type IndexMergeReaderExecutor struct {

// columns are only required by union scan.
columns []*model.ColumnInfo
// partitionIDMap are only required by union scan with global index.
partitionIDMap map[int64]struct{}
*dataReaderBuilder

// fields about accessing partition tables
partitionTableMode bool // if this IndexMerge is accessing a partition table
prunedPartitions []table.PhysicalTable // pruned partition tables need to access
partitionKeyRanges [][][]kv.KeyRange // [partitionIdx][partialIndex][ranges]
partitionKeyRanges [][][]kv.KeyRange // [partialIndex][partitionIdx][ranges]

// All fields above are immutable.

Expand All @@ -127,9 +129,6 @@ type IndexMergeReaderExecutor struct {
memTracker *memory.Tracker
paging bool

// checkIndexValue is used to check the consistency of the index data.
*checkIndexValue // nolint:unused

partialPlans [][]base.PhysicalPlan
tblPlans []base.PhysicalPlan
partialNetDataSizes []float64
Expand All @@ -146,6 +145,8 @@ type IndexMergeReaderExecutor struct {

// Whether it's intersection or union.
isIntersection bool

hasGlobalIndex bool
}

type indexMergeTableTask struct {
Expand Down Expand Up @@ -182,12 +183,23 @@ func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
return err
}
} else {
e.partitionKeyRanges = make([][][]kv.KeyRange, len(e.prunedPartitions))
e.partitionKeyRanges = make([][][]kv.KeyRange, len(e.indexes))
tmpPartitionKeyRanges := make([][][]kv.KeyRange, len(e.prunedPartitions))
for i, p := range e.prunedPartitions {
if e.partitionKeyRanges[i], err = e.buildKeyRangesForTable(p); err != nil {
if tmpPartitionKeyRanges[i], err = e.buildKeyRangesForTable(p); err != nil {
return err
}
}
for i, idx := range e.indexes {
if idx != nil && idx.Global {
keyRange, _ := distsql.IndexRangesToKVRanges(e.ctx.GetDistSQLCtx(), e.table.Meta().ID, idx.ID, e.ranges[i])
e.partitionKeyRanges[i] = [][]kv.KeyRange{keyRange.FirstPartitionRange()}
} else {
for _, pKeyRanges := range tmpPartitionKeyRanges {
e.partitionKeyRanges[i] = append(e.partitionKeyRanges[i], pKeyRanges[i])
}
}
}
}
e.finished = make(chan struct{})
e.resultCh = make(chan *indexMergeTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
Expand Down Expand Up @@ -328,13 +340,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,

var keyRanges [][]kv.KeyRange
if e.partitionTableMode {
for _, pKeyRanges := range e.partitionKeyRanges { // get all keyRanges related to this PartialIndex
keyRanges = append(keyRanges, pKeyRanges[workID])
}
keyRanges = e.partitionKeyRanges[workID]
} else {
keyRanges = [][]kv.KeyRange{e.keyRanges[workID]}
}

failpoint.Inject("startPartialIndexWorkerErr", func() error {
return errors.New("inject an error before start partialIndexWorker")
})
Expand Down Expand Up @@ -376,7 +385,6 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
return
}
}

var builder distsql.RequestBuilder
builder.SetDAGRequest(e.dagPBs[workID]).
SetStartTS(e.startTS).
Expand Down Expand Up @@ -1191,7 +1199,7 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
if w.indexMerge.pushedLimit != nil {
pushedLimit = w.indexMerge.pushedLimit.Clone()
}
distinctHandles := make(map[int64]*kv.HandleMap)
hMap := kv.NewHandleMap()
for {
var ok bool
var task *indexMergeTableTask
Expand Down Expand Up @@ -1223,19 +1231,12 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
fhs := make([]kv.Handle, 0, 8)

memTracker.Consume(int64(cap(task.handles) * 8))

var tblID int64
if w.indexMerge.partitionTableMode {
tblID = getPhysicalTableID(task.partitionTable)
} else {
tblID = getPhysicalTableID(w.indexMerge.table)
}
if _, ok := distinctHandles[tblID]; !ok {
distinctHandles[tblID] = kv.NewHandleMap()
}
hMap := distinctHandles[tblID]

for _, h := range handles {
if w.indexMerge.partitionTableMode {
if _, ok := h.(kv.PartitionHandle); !ok {
h = kv.NewPartitionHandle(task.partitionTable.GetPhysicalID(), h)
}
}
if _, ok := hMap.Get(h); !ok {
fhs = append(fhs, h)
hMap.Set(h, true)
Expand Down Expand Up @@ -1359,6 +1360,8 @@ type intersectionProcessWorker struct {
// When rowDelta == memConsumeBatchSize, Consume(memUsage)
rowDelta int64
mapUsageDelta int64

partitionIDMap map[int64]int
}

func (w *intersectionProcessWorker) consumeMemDelta() {
Expand All @@ -1380,9 +1383,20 @@ func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Conte
hMap = kv.NewMemAwareHandleMap[*int]()
w.handleMapsPerWorker[task.parTblIdx] = hMap
}
var mapDelta int64
var rowDelta int64
var mapDelta, rowDelta int64
for _, h := range task.handles {
if w.indexMerge.hasGlobalIndex {
if ph, ok := h.(kv.PartitionHandle); ok {
if v, exists := w.partitionIDMap[ph.PartitionID]; exists {
if hMap, ok = w.handleMapsPerWorker[v]; !ok {
hMap = kv.NewMemAwareHandleMap[*int]()
w.handleMapsPerWorker[v] = hMap
}
}
} else {
h = kv.NewPartitionHandle(task.partitionTable.GetPhysicalID(), h)
}
}
// Use *int to avoid Get() again.
if cntPtr, ok := hMap.Get(h); ok {
(*cntPtr)++
Expand Down Expand Up @@ -1525,7 +1539,8 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet
batchSize := w.indexMerge.Ctx().GetSessionVars().IndexLookupSize

partCnt := 1
if w.indexMerge.partitionTableMode {
// To avoid multi-threaded access the handle map, we only use one worker for indexMerge with global index.
if w.indexMerge.partitionTableMode && !w.indexMerge.hasGlobalIndex {
partCnt = len(w.indexMerge.prunedPartitions)
}
workerCnt := min(partCnt, maxWorkerCnt)
Expand All @@ -1536,6 +1551,13 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet
}
})

partitionIDMap := make(map[int64]int)
if w.indexMerge.hasGlobalIndex {
for i, p := range w.indexMerge.prunedPartitions {
partitionIDMap[p.GetPhysicalID()] = i
}
}

workers := make([]*intersectionProcessWorker, 0, workerCnt)
var collectWorker *intersectionCollectWorker
wg := util.WaitGroupWrapper{}
Expand Down Expand Up @@ -1566,6 +1588,7 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet
indexMerge: w.indexMerge,
memTracker: tracker,
batchSize: batchSize,
partitionIDMap: partitionIDMap,
}
wg.RunWithRecover(func() {
defer trace.StartRegion(ctx, "IndexMergeIntersectionProcessWorker").End()
Expand Down Expand Up @@ -1692,7 +1715,7 @@ func (w *partialIndexWorker) needPartitionHandle() (bool, error) {
if needPartitionHandle && !hasExtraCol {
return needPartitionHandle, errors.Errorf("Internal error, needPartitionHandle != ret")
}
return needPartitionHandle, nil
return needPartitionHandle || (col.ID == model.ExtraPidColID), nil
}

func (w *partialIndexWorker) fetchHandles(
Expand Down

0 comments on commit 707258f

Please sign in to comment.