Skip to content

Commit

Permalink
fix ci fail
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
  • Loading branch information
windtalker committed Apr 29, 2024
1 parent 7011b7e commit 2398052
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 143 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ go_library(
"//pkg/executor/lockstats",
"//pkg/executor/metrics",
"//pkg/executor/sortexec",
"//pkg/executor/union",
"//pkg/executor/unionexec",
"//pkg/expression",
"//pkg/expression/aggregation",
"//pkg/expression/context",
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/lockstats"
executor_metrics "github.com/pingcap/tidb/pkg/executor/metrics"
"github.com/pingcap/tidb/pkg/executor/sortexec"
"github.com/pingcap/tidb/pkg/executor/union"
"github.com/pingcap/tidb/pkg/executor/unionexec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/infoschema"
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) e
// Be careful to avoid data race.
func (b *executorBuilder) buildUnionScanFromReader(reader exec.Executor, v *plannercore.PhysicalUnionScan) exec.Executor {
// If reader is union, it means a partition table and we should transfer as above.
if x, ok := reader.(*union.UnionExec); ok {
if x, ok := reader.(*unionexec.UnionExec); ok {
for i, child := range x.AllChildren() {
x.SetChildren(i, b.buildUnionScanFromReader(child, v))
if b.err != nil {
Expand Down Expand Up @@ -2286,7 +2286,7 @@ func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) exec.Ex
return nil
}
}
e := &union.UnionExec{
e := &unionexec.UnionExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExecs...),
Concurrency: b.ctx.GetSessionVars().UnionConcurrency(),
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/join/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ go_library(
"//pkg/executor/internal/applycache",
"//pkg/executor/internal/exec",
"//pkg/executor/internal/vecgroupchecker",
"//pkg/executor/union",
"//pkg/executor/unionexec",
"//pkg/expression",
"//pkg/parser/mysql",
"//pkg/parser/terror",
Expand Down
2 changes: 2 additions & 0 deletions pkg/executor/join/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type HashContext struct {
naColNullBitMap []*bitmap.ConcurrentBitmap
}

// InitHash init HashContext
func (hc *HashContext) InitHash(rows int) {
if hc.Buf == nil {
hc.Buf = make([]byte, 1)
Expand Down Expand Up @@ -608,6 +609,7 @@ func (es *entryStore) GetStore() (e *entry, memDelta int64) {
return
}

// BaseHashTable is the interface of the hash table used in hash join
type BaseHashTable interface {
Put(hashKey uint64, rowPtr chunk.RowPtr)
// e := Get(hashKey)
Expand Down
36 changes: 18 additions & 18 deletions pkg/executor/join/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ import (
// the execution of IndexNestedLoopHashJoin.
const numResChkHold = 4

// IndexNestedLoopHashJoin employs one Outer worker and N inner workers to
// IndexNestedLoopHashJoin employs one outer worker and N inner workers to
// execute concurrently. The output order is not promised.
//
// The execution flow is very similar to IndexLookUpReader:
// 1. The Outer worker reads N Outer rows, builds a task and sends it to the
// 1. The outer worker reads N outer rows, builds a task and sends it to the
// inner worker channel.
// 2. The inner worker receives the tasks and does 3 things for every task:
// 1. builds hash table from the Outer rows
// 2. builds key ranges from Outer rows and fetches inner rows
// 1. builds hash table from the outer rows
// 2. builds key ranges from outer rows and fetches inner rows
// 3. probes the hash table and sends the join result to the main thread channel.
// Note: step 1 and step 2 runs concurrently.
//
Expand All @@ -64,8 +64,8 @@ type IndexNestedLoopHashJoin struct {
IndexLookUpJoin
resultCh chan *indexHashJoinResult
joinChkResourceCh []chan *chunk.Chunk
// We build individual Joiner for each inner worker when using chunk-based
// execution, to avoid the concurrency of Joiner.chk and Joiner.selected.
// We build individual joiner for each inner worker when using chunk-based
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
Joiners []Joiner
KeepOuterOrder bool
curTask *indexHashJoinTask
Expand All @@ -84,7 +84,7 @@ type indexHashJoinOuterWorker struct {
outerWorker
innerCh chan *indexHashJoinTask
keepOuterOrder bool
// taskCh is only used when the Outer order needs to be promised.
// taskCh is only used when the outer order needs to be promised.
taskCh chan *indexHashJoinTask
}

Expand Down Expand Up @@ -114,14 +114,14 @@ type indexHashJoinTask struct {
lookupMap BaseHashTable
err error
keepOuterOrder bool
// resultCh is only used when the Outer order needs to be promised.
// resultCh is only used when the outer order needs to be promised.
resultCh chan *indexHashJoinResult
// matchedInnerRowPtrs is only valid when the Outer order needs to be
// matchedInnerRowPtrs is only valid when the outer order needs to be
// promised. Otherwise, it will be nil.
// len(matchedInnerRowPtrs) equals to
// lookUpJoinTask.outerResult.NumChunks(), and the elements of every
// matchedInnerRowPtrs[chkIdx][rowIdx] indicates the matched inner Row ptrs
// of the corresponding Outer Row.
// matchedInnerRowPtrs[chkIdx][rowIdx] indicates the matched inner row ptrs
// of the corresponding outer row.
matchedInnerRowPtrs [][][]chunk.RowPtr
}

Expand Down Expand Up @@ -513,14 +513,14 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.
}
err := iw.handleTask(ctx, task, joinResult, h, resultCh)
if err != nil && !task.keepOuterOrder {
// Only need check non-keep-Outer-order case because the
// `JoinResult` had been sent to the `resultCh` when err != nil.
// Only need check non-keep-outer-order case because the
// `joinResult` had been sent to the `resultCh` when err != nil.
joinResult.err = err
break
}
if task.keepOuterOrder {
// We need to get a new result holder here because the old
// `JoinResult` hash been sent to the `resultCh` or to the
// `joinResult` hash been sent to the `resultCh` or to the
// `joinChkResourceCh`.
joinResult, ok = iw.getNewJoinResult(ctx)
if !ok {
Expand All @@ -533,7 +533,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.
joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr")
})
// When task.KeepOuterOrder is TRUE (resultCh != iw.resultCh):
// - the last JoinResult will be handled when the task has been processed,
// - the last joinResult will be handled when the task has been processed,
// thus we DO NOT need to check it here again.
// - we DO NOT check the error here neither, because:
// - if the error is from task.err, the main thread will check the error of each task
Expand Down Expand Up @@ -807,10 +807,10 @@ func (iw *indexHashJoinInnerWorker) collectMatchedInnerPtrs4OuterRows(innerRow c
}

// doJoinInOrder follows the following steps:
// 1. collect all the matched inner Row ptrs for every Outer Row
// 1. collect all the matched inner row ptrs for every outer row
// 2. do the join work
// 2.1 collect all the matched inner rows using the collected ptrs for every Outer Row
// 2.2 call TryToMatchInners for every Outer Row
// 2.1 collect all the matched inner rows using the collected ptrs for every outer row
// 2.2 call TryToMatchInners for every outer row
// 2.3 call OnMissMatch when no inner rows are matched
func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) (err error) {
defer func() {
Expand Down
18 changes: 12 additions & 6 deletions pkg/executor/join/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ import (

var _ exec.Executor = &IndexLookUpJoin{}

// IndexLookUpJoin employs one Outer worker and N innerWorkers to execute concurrently.
// It preserves the order of the Outer table and support batch lookup.
// IndexLookUpJoin employs one outer worker and N innerWorkers to execute concurrently.
// It preserves the order of the outer table and support batch lookup.
//
// The execution flow is very similar to IndexLookUpReader:
// 1. outerWorker read N Outer rows, build a task and send it to result channel and inner worker channel.
// 2. The innerWorker receives the task, builds key ranges from Outer rows and fetch inner rows, builds inner Row hash map.
// 1. outerWorker read N outer rows, build a task and send it to result channel and inner worker channel.
// 2. The innerWorker receives the task, builds key ranges from outer rows and fetch inner rows, builds inner row hash map.
// 3. main thread receives the task, waits for inner worker finish handling the task.
// 4. main thread join each Outer Row by look up the inner rows hash map in the task.
// 4. main thread join each outer row by look up the inner rows hash map in the task.
type IndexLookUpJoin struct {
exec.BaseExecutor

Expand All @@ -79,7 +79,7 @@ type IndexLookUpJoin struct {
KeyOff2IdxOff []int
innerPtrBytes [][]byte

// LastColHelper store the information for last col if there's complicated Filter like col > x_col and col < x_col + 100.
// LastColHelper store the information for last col if there's complicated filter like col > x_col and col < x_col + 100.
LastColHelper *plannercore.ColWithCmpFuncManager

memTracker *memory.Tracker // track memory usage.
Expand All @@ -89,18 +89,23 @@ type IndexLookUpJoin struct {
prepared bool
}

// OuterCtx is the outer ctx used in index lookup join
type OuterCtx struct {
RowTypes []*types.FieldType
KeyCols []int
HashTypes []*types.FieldType
HashCols []int
Filter expression.CNFExprs
}

// IndexJoinExecutorBuilder is the interface used by index lookup join to build the executor, this interface
// is added to avoid cycle import
type IndexJoinExecutorBuilder interface {
BuildExecutorForIndexJoin(ctx context.Context, lookUpContents []*IndexJoinLookUpContent,
indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (exec.Executor, error)
}

// InnerCtx is the inner side ctx used in index lookup join
type InnerCtx struct {
ReaderBuilder IndexJoinExecutorBuilder
RowTypes []*types.FieldType
Expand Down Expand Up @@ -525,6 +530,7 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) {
}
}

// IndexJoinLookUpContent is the content used in index lookup join
type IndexJoinLookUpContent struct {
Keys []types.Datum
Row chunk.Row
Expand Down
24 changes: 13 additions & 11 deletions pkg/executor/join/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ import (
)

// IndexLookUpMergeJoin realizes IndexLookUpJoin by merge join
// It preserves the order of the Outer table and support batch lookup.
// It preserves the order of the outer table and support batch lookup.
//
// The execution flow is very similar to IndexLookUpReader:
// 1. outerWorker read N Outer rows, build a task and send it to result channel and inner worker channel.
// 2. The innerWorker receives the task, builds key ranges from Outer rows and fetch inner rows, then do merge join.
// 1. outerWorker read N outer rows, build a task and send it to result channel and inner worker channel.
// 2. The innerWorker receives the task, builds key ranges from outer rows and fetch inner rows, then do merge join.
// 3. main thread receives the task and fetch results from the channel in task one by one.
// 4. If channel has been closed, main thread receives the Next task.
// 4. If channel has been closed, main thread receives the next task.
type IndexLookUpMergeJoin struct {
exec.BaseExecutor

Expand All @@ -69,13 +69,14 @@ type IndexLookUpMergeJoin struct {
IndexRanges ranger.MutableRanges
KeyOff2IdxOff []int

// LastColHelper store the information for last col if there's complicated Filter like col > x_col and col < x_col + 100.
// LastColHelper store the information for last col if there's complicated filter like col > x_col and col < x_col + 100.
LastColHelper *plannercore.ColWithCmpFuncManager

memTracker *memory.Tracker // track memory usage
prepared bool
}

// OuterMergeCtx is the outer side ctx of merge join
type OuterMergeCtx struct {
RowTypes []*types.FieldType
JoinKeys []*expression.Column
Expand All @@ -85,6 +86,7 @@ type OuterMergeCtx struct {
CompareFuncs []expression.CompareFunc
}

// InnerMergeCtx is the inner side ctx of merge join
type InnerMergeCtx struct {
ReaderBuilder IndexJoinExecutorBuilder
RowTypes []*types.FieldType
Expand Down Expand Up @@ -172,7 +174,7 @@ func (e *IndexLookUpMergeJoin) startWorkers(ctx context.Context) {
concurrency := e.Ctx().GetSessionVars().IndexLookupJoinConcurrency()
if e.RuntimeStats() != nil {
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("concurrency", concurrency))
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), runtimeStats)
}

Expand Down Expand Up @@ -446,9 +448,9 @@ func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJo
panic("OOM test index merge join doesn't hang here.")
}
})
// NeedOuterSort means the Outer side property items can't guarantee the order of join Keys.
// Because the necessary condition of merge join is both Outer and inner keep order of join Keys.
// In this case, we need sort the Outer side.
// NeedOuterSort means the outer side property items can't guarantee the order of join keys.
// Because the necessary condition of merge join is both outer and inner keep order of join keys.
// In this case, we need sort the outer side.
if imw.outerMergeCtx.NeedOuterSort {
exprCtx := imw.ctx.GetExprCtx()
slices.SortFunc(task.outerOrderIdx, func(idxI, idxJ chunk.RowPtr) int {
Expand Down Expand Up @@ -606,7 +608,7 @@ func (imw *innerMergeWorker) doMergeJoin(ctx context.Context, task *lookUpMergeJ
return nil
}

// fetchInnerRowsWithSameKey collects the inner rows having the same key with one Outer Row.
// fetchInnerRowsWithSameKey collects the inner rows having the same key with one outer row.
func (imw *innerMergeWorker) fetchInnerRowsWithSameKey(ctx context.Context, task *lookUpMergeJoinTask, key chunk.Row) (noneInnerRows bool, err error) {
task.sameKeyInnerRows = task.sameKeyInnerRows[:0]
curRow := task.innerIter.Current()
Expand Down Expand Up @@ -738,7 +740,7 @@ func (e *IndexLookUpMergeJoin) Close() error {
e.joinChkResourceCh = nil
// joinChkResourceCh is to recycle result chunks, used by inner worker.
// resultCh is the main thread get the results, used by main thread and inner worker.
// cancelFunc control the Outer worker and Outer worker close the task channel.
// cancelFunc control the outer worker and outer worker close the task channel.
e.WorkerWg.Wait()
e.memTracker = nil
e.prepared = false
Expand Down

0 comments on commit 2398052

Please sign in to comment.