diff --git a/executor/builder.go b/executor/builder.go index 47663e20e512..621cbcf658a7 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1637,7 +1637,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) e.innerCtx.keyCols = innerKeyCols e.joinResult = e.newFirstChunk() metrics.ExecutorCounter.WithLabelValues("IndexLookUpJoin").Inc() - return e + return &IndexLookUpHash{IndexLookUpJoin:*e} } // containsLimit tests if the execs contains Limit because we do not know whether `Limit` has consumed all of its' source, diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 0a13aff72fe0..f14e6bbb91d0 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -15,6 +15,7 @@ package executor import ( "fmt" + "github.com/pingcap/tidb/util" "runtime" "sort" "sync" @@ -68,6 +69,10 @@ type IndexLookUpJoin struct { innerPtrBytes [][]byte memTracker *memory.Tracker // track memory usage. + + joinResultCh chan *indexLookUpResult + joinChkResourceCh []chan *chunk.Chunk + closeCh chan struct{} // closeCh add a lock for closing executor. } type outerCtx struct { @@ -115,6 +120,10 @@ type outerWorker struct { parentMemTracker *memory.Tracker } +type outerHashWorker struct { + outerWorker +} + type innerWorker struct { innerCtx @@ -125,10 +134,32 @@ type innerWorker struct { indexRanges []*ranger.Range keyOff2IdxOff []int + + joiner joiner + maxChunkSize int + workerId int + joinChkResourceCh []chan *chunk.Chunk + closeCh chan struct{} + joinResultCh chan *indexLookUpResult +} + +type innerHashWorker struct { + innerWorker + matchPtrBytes [][]byte +} + +type indexLookUpResult struct { + chk *chunk.Chunk + err error + src chan<- *chunk.Chunk +} + +type IndexLookUpHash struct { + IndexLookUpJoin } // Open implements the Executor interface. -func (e *IndexLookUpJoin) Open(ctx context.Context) error { +func (e *IndexLookUpHash) Open(ctx context.Context) error { // Be careful, very dirty hack in this line!!! // IndexLookUpJoin need to rebuild executor (the dataReaderBuilder) during // executing. However `executor.Next()` is lazy evaluation when the RecordSet @@ -159,8 +190,21 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { return nil } -func (e *IndexLookUpJoin) startWorkers(ctx context.Context) { +func (e *IndexLookUpJoin) finishInnerWorker(r interface{}) { + if r != nil { + e.joinResultCh <- &indexLookUpResult{err: errors.Errorf("%v", r)} + } + e.workerWg.Done() +} + +func (e *IndexLookUpJoin) waitInnerHashWorkersAndCloseResultChan() { + e.workerWg.Wait() + close(e.joinResultCh) +} + +func (e *IndexLookUpHash) startWorkers(ctx context.Context) { concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency + concurrency = 1 resultCh := make(chan *lookUpJoinTask, concurrency) e.resultCh = resultCh workerCtx, cancelFunc := context.WithCancel(ctx) @@ -168,86 +212,86 @@ func (e *IndexLookUpJoin) startWorkers(ctx context.Context) { innerCh := make(chan *lookUpJoinTask, concurrency) e.workerWg.Add(1) go e.newOuterWorker(resultCh, innerCh).run(workerCtx, e.workerWg) + + e.closeCh = make(chan struct{}) + e.joinResultCh = make(chan *indexLookUpResult, concurrency+1) + e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency) + for i := int(0); i < concurrency; i++ { + e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1) + e.joinChkResourceCh[i] <- e.newFirstChunk() + } e.workerWg.Add(concurrency) - for i := 0; i < concurrency; i++ { - go e.newInnerWorker(innerCh).run(workerCtx, e.workerWg) + for i := int(0); i < concurrency; i++ { + workerId := i + go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerId).run(workerCtx, e.workerWg) }, e.finishInnerWorker) } + go util.WithRecovery(e.waitInnerHashWorkersAndCloseResultChan, nil) } -func (e *IndexLookUpJoin) newOuterWorker(resultCh, innerCh chan *lookUpJoinTask) *outerWorker { - ow := &outerWorker{ - outerCtx: e.outerCtx, - ctx: e.ctx, - executor: e.children[0], - executorChk: chunk.NewChunkWithCapacity(e.outerCtx.rowTypes, e.maxChunkSize), - resultCh: resultCh, - innerCh: innerCh, - batchSize: 32, - maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize, - parentMemTracker: e.memTracker, +func (e *IndexLookUpHash) newOuterWorker(resultCh, innerCh chan *lookUpJoinTask) *outerHashWorker { + ow := &outerHashWorker{ + outerWorker: outerWorker{ + outerCtx: e.outerCtx, + ctx: e.ctx, + executor: e.children[0], + executorChk: chunk.NewChunkWithCapacity(e.outerCtx.rowTypes, e.maxChunkSize), + resultCh: resultCh, + innerCh: innerCh, + batchSize: 32, + maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize, + parentMemTracker: e.memTracker, + }, } return ow } -func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWorker { +func (e *IndexLookUpHash) newInnerWorker(taskCh chan *lookUpJoinTask, workerId int) *innerHashWorker { // Since multiple inner workers run concurrently, we should copy join's indexRanges for every worker to avoid data race. copiedRanges := make([]*ranger.Range, 0, len(e.indexRanges)) for _, ran := range e.indexRanges { copiedRanges = append(copiedRanges, ran.Clone()) } - iw := &innerWorker{ - innerCtx: e.innerCtx, - outerCtx: e.outerCtx, - taskCh: taskCh, - ctx: e.ctx, - executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), - indexRanges: copiedRanges, - keyOff2IdxOff: e.keyOff2IdxOff, + iw := &innerHashWorker{ + innerWorker: innerWorker{ + innerCtx: e.innerCtx, + outerCtx: e.outerCtx, + taskCh: taskCh, + ctx: e.ctx, + executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), + indexRanges: copiedRanges, + keyOff2IdxOff: e.keyOff2IdxOff, + joiner: e.joiner, + maxChunkSize: e.maxChunkSize, + workerId: workerId, + closeCh: e.closeCh, + joinChkResourceCh: e.joinChkResourceCh, + joinResultCh: e.joinResultCh, + }, + matchPtrBytes: make([][]byte, 0, 8), } return iw } // Next implements the Executor interface. -func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error { +func (e *IndexLookUpHash) Next(ctx context.Context, chk *chunk.Chunk) error { if e.runtimeStats != nil { start := time.Now() defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } chk.Reset() - e.joinResult.Reset() - for { - task, err := e.getFinishedTask(ctx) - if err != nil { - return errors.Trace(err) - } - if task == nil { - return nil - } - if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() { - e.lookUpMatchedInners(task, task.cursor) - e.innerIter = chunk.NewIterator4Slice(task.matchedInners) - e.innerIter.Begin() - } - - outerRow := task.outerResult.GetRow(task.cursor) - if e.innerIter.Current() != e.innerIter.End() { - matched, err := e.joiner.tryToMatch(outerRow, e.innerIter, chk) - if err != nil { - return errors.Trace(err) - } - task.hasMatch = task.hasMatch || matched - } - if e.innerIter.Current() == e.innerIter.End() { - if !task.hasMatch { - e.joiner.onMissMatch(outerRow, chk) - } - task.cursor++ - task.hasMatch = false - } - if chk.NumRows() == e.maxChunkSize { - return nil - } + if e.joinResultCh == nil { + return nil + } + result, ok := <-e.joinResultCh + if !ok { + return nil + } + if result.err != nil { + return errors.Trace(result.err) } + chk.SwapColumns(result.chk) + result.src <- result.chk + return nil } func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, error) { @@ -281,30 +325,26 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, return task, nil } -func (e *IndexLookUpJoin) lookUpMatchedInners(task *lookUpJoinTask, rowIdx int) { - outerKey := task.encodedLookUpKeys.GetRow(rowIdx).GetBytes(0) - e.innerPtrBytes = task.lookupMap.Get(outerKey, e.innerPtrBytes[:0]) - task.matchedInners = task.matchedInners[:0] - - for _, b := range e.innerPtrBytes { - ptr := *(*chunk.RowPtr)(unsafe.Pointer(&b[0])) - matchedInner := task.innerResult.GetRow(ptr) - task.matchedInners = append(task.matchedInners, matchedInner) - } -} - -func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { +//func (e *IndexLookUpJoin) lookUpMatchedInners(task *lookUpJoinTask, rowIdx int) { +// outerKey := task.encodedLookUpKeys.GetRow(rowIdx).GetBytes(0) +// e.innerPtrBytes = task.lookupMap.Get(outerKey, e.innerPtrBytes[:0]) +// task.matchedInners = task.matchedInners[:0] +// +// for _, b := range e.innerPtrBytes { +// ptr := *(*chunk.RowPtr)(unsafe.Pointer(&b[0])) +// matchedInner := task.innerResult.GetRow(ptr) +// task.matchedInners = append(task.matchedInners, matchedInner) +// } +//} + +func (ow *outerHashWorker) run(ctx context.Context, wg *sync.WaitGroup) { defer func() { if r := recover(); r != nil { buf := make([]byte, 4096) stackSize := runtime.Stack(buf, false) buf = buf[:stackSize] log.Errorf("outerWorker panic stack is:\n%s", buf) - task := &lookUpJoinTask{doneCh: make(chan error, 1)} - task.doneCh <- errors.Errorf("%v", r) - ow.pushToChan(ctx, task, ow.resultCh) } - close(ow.resultCh) close(ow.innerCh) wg.Done() }() @@ -312,7 +352,6 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { task, err := ow.buildTask(ctx) if err != nil { task.doneCh <- errors.Trace(err) - ow.pushToChan(ctx, task, ow.resultCh) return } if task == nil { @@ -322,10 +361,6 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { if finished := ow.pushToChan(ctx, task, ow.innerCh); finished { return } - - if finished := ow.pushToChan(ctx, task, ow.resultCh); finished { - return - } } } @@ -382,9 +417,40 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { } task.memTracker.Consume(int64(cap(task.outerMatch))) } + err := ow.buildLookUpMap(task) + if err != nil { + return task, errors.Trace(err) + } return task, nil } +func (ow *outerWorker) buildLookUpMap(task *lookUpJoinTask) error { + keyBuf := make([]byte, 0, 64) + valBuf := make([]byte, 8) + for i := 0; i < task.outerResult.NumRows(); i++ { + if task.outerMatch != nil && !task.outerMatch[i] { + continue + } + outerRow := task.outerResult.GetRow(i) + if ow.hasNullInJoinKey(outerRow) { //skip outer row? + continue + } + keyBuf = keyBuf[:0] + for _, keyCol := range ow.keyCols { + d := outerRow.GetDatum(keyCol, ow.rowTypes[keyCol]) + var err error + keyBuf, err = codec.EncodeKey(ow.ctx.GetSessionVars().StmtCtx, keyBuf, d) + if err != nil { + return errors.Trace(err) + } + } + rowPtr := chunk.RowPtr{ChkIdx: uint32(0), RowIdx: uint32(i)} + *(*chunk.RowPtr)(unsafe.Pointer(&valBuf[0])) = rowPtr + task.lookupMap.Put(keyBuf, valBuf) + } + return nil +} + func (ow *outerWorker) increaseBatchSize() { if ow.batchSize < ow.maxBatchSize { ow.batchSize *= 2 @@ -394,22 +460,29 @@ func (ow *outerWorker) increaseBatchSize() { } } -func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) { - var task *lookUpJoinTask - defer func() { - if r := recover(); r != nil { - buf := make([]byte, 4096) - stackSize := runtime.Stack(buf, false) - buf = buf[:stackSize] - log.Errorf("innerWorker panic stack is:\n%s", buf) - // "task != nil" is guaranteed when panic happened. - task.doneCh <- errors.Errorf("%v", r) - } - wg.Done() - }() +func (iw *innerHashWorker) getNewJoinResult() (bool, *indexLookUpResult) { + joinResult := &indexLookUpResult{ + src: iw.joinChkResourceCh[iw.workerId], + } + ok := true + select { + case <-iw.closeCh: + ok = false + case joinResult.chk, ok = <-iw.joinChkResourceCh[iw.workerId]: + } + return ok, joinResult +} - for ok := true; ok; { +func (iw *innerHashWorker) run(ctx context.Context, wg *sync.WaitGroup) { + var task *lookUpJoinTask + ok, joinResult := iw.getNewJoinResult() + if !ok { + return + } + for { select { + case <-iw.closeCh: + return case task, ok = <-iw.taskCh: if !ok { return @@ -417,29 +490,178 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) { case <-ctx.Done(): return } - - err := iw.handleTask(ctx, task) - task.doneCh <- errors.Trace(err) + err := iw.handleTask(ctx, task, joinResult) + if err != nil { + return + } } } -func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) error { +func (iw *innerHashWorker) handleTask(ctx context.Context, task *lookUpJoinTask, joinResult *indexLookUpResult) error { dLookUpKeys, err := iw.constructDatumLookupKeys(task) if err != nil { return errors.Trace(err) } dLookUpKeys = iw.sortAndDedupDatumLookUpKeys(dLookUpKeys) + //err = iw.fetchAndJoin(ctx, task, dLookUpKeys, joinResult) + //if err != nil { + // return errors.Trace(err) + //} err = iw.fetchInnerResults(ctx, task, dLookUpKeys) if err != nil { return errors.Trace(err) } - err = iw.buildLookUpMap(task) - if err != nil { - return errors.Trace(err) + var ok bool + ok, joinResult = iw.join2Chunk(joinResult, task) + + if joinResult == nil { + return nil + } else if joinResult.err != nil || (joinResult.chk != nil && joinResult.chk.NumRows() > 0) { + iw.joinResultCh <- joinResult + } + if !ok { + return errors.New("join2Chunk failed") } return nil + +} + +//func (iw *innerHashWorker) fetchAndJoin(ctx context.Context, task *lookUpJoinTask, dLookUpKeys [][]types.Datum, joinResult *indexLookUpResult) error { +// innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, iw.indexRanges, iw.keyOff2IdxOff) +// if err != nil { +// return errors.Trace(err) +// } +// defer terror.Call(innerExec.Close) +// innerResult := chunk.NewList(innerExec.retTypes(), iw.ctx.GetSessionVars().MaxChunkSize, iw.ctx.GetSessionVars().MaxChunkSize) +// innerResult.GetMemTracker().SetLabel("inner result") +// innerResult.GetMemTracker().AttachTo(task.memTracker) +// iw.executorChk.Reset() +// var ok bool +// for { +// err := innerExec.Next(ctx, iw.executorChk) +// if err != nil { +// return errors.Trace(err) +// } +// if iw.executorChk.NumRows() == 0 { +// break +// } +// ok, joinResult = iw.join2Chunk(iw.executorChk, joinResult, task) +// if !ok { +// break +// } +// } +// //it := task.lookupMap.NewIterator() +// //for i := 0; i < task.outerResult.NumRows(); i++ { +// // key, rowPtr := it.Next() +// // if key == nil || rowPtr == nil { +// // break +// // } +// // iw.innerPtrBytes = task.matchKeyMap.Get(key, iw.innerPtrBytes[:0]) +// // if len(iw.innerPtrBytes) == 0 { +// // ptr := *(*chunk.RowPtr)(unsafe.Pointer(&rowPtr[0])) +// // misMatchedRow := task.outerResult.GetRow(int(ptr.RowIdx)) +// // iw.joiner.onMissMatch(misMatchedRow, joinResult.chk) +// // } +// //} +// if joinResult == nil { +// return nil +// } else if joinResult.err != nil || (joinResult.chk != nil && joinResult.chk.NumRows() > 0) { +// iw.joinResultCh <- joinResult +// } +// if !ok { +// return errors.New("join2Chunk failed") +// } +// return nil +//} + +func (iw *innerHashWorker) join2Chunk(joinResult *indexLookUpResult, task *lookUpJoinTask) (ok bool, _ *indexLookUpResult) { + + for i := 0; i < task.outerResult.NumRows(); i++ { + if task.outerMatch != nil && !task.outerMatch[i] { + continue + } + hasMatch := false + outerRow := task.outerResult.GetRow(i) + for i := 0; i < task.innerResult.NumChunks(); i++ { + chk := task.innerResult.GetChunk(i) + for j := 0; j < chk.NumRows(); j++ { + innerRow := chk.GetRow(j) + if iw.hasNullInJoinKey(innerRow) { + continue + } + task.matchedInners = task.matchedInners[:0] + task.matchedInners = append(task.matchedInners, innerRow) + innerIter := chunk.NewIterator4Slice(task.matchedInners) + innerIter.Begin() + matched, err := iw.joiner.tryToMatch(outerRow, innerIter, joinResult.chk) + if err != nil { + joinResult.err = errors.Trace(err) + return false, joinResult + } + hasMatch = hasMatch || matched + if joinResult.chk.NumRows() == iw.maxChunkSize { + ok := true + iw.joinResultCh <- joinResult + ok, joinResult = iw.getNewJoinResult() + if !ok { + return false, joinResult + } + } + } + } + if !hasMatch { + iw.joiner.onMissMatch(outerRow, joinResult.chk) + } + } + return true, joinResult } +//func (iw *innerHashWorker) joinMatchInnerRow2Chunk(innerRow chunk.Row, task *lookUpJoinTask, +// joinResult *indexLookUpResult) (bool, *indexLookUpResult) { +// keyBuf := make([]byte, 0, 64) +// for _, keyCol := range iw.keyCols { +// d := innerRow.GetDatum(keyCol, iw.rowTypes[keyCol]) +// var err error +// keyBuf, err = codec.EncodeKey(iw.ctx.GetSessionVars().StmtCtx, keyBuf, d) +// if err != nil { +// return false, joinResult +// } +// } +// iw.matchPtrBytes = task.lookupMap.Get(keyBuf, iw.matchPtrBytes[:0]) +// if len(iw.matchPtrBytes) == 0 { +// return true, joinResult +// } +// task.matchedOuters = task.matchedOuters[:0] +// for _, b := range iw.matchPtrBytes { +// ptr := *(*chunk.RowPtr)(unsafe.Pointer(&b[0])) +// matchedOuter := task.outerResult.GetRow(int(ptr.RowIdx)) +// task.matchedOuters = append(task.matchedOuters, matchedOuter) +// } +// outerIter := chunk.NewIterator4Slice(task.matchedOuters) +// hasMatch := false +// +// for outerIter.Begin(); outerIter.Current() != outerIter.End(); { +// matched, err := iw.joiner.tryToMatch(innerRow, outerIter, joinResult.chk) +// if err != nil { +// joinResult.err = errors.Trace(err) +// return false, joinResult +// } +// hasMatch = hasMatch || matched +// if joinResult.chk.NumRows() == iw.maxChunkSize { +// ok := true +// iw.joinResultCh <- joinResult +// ok, joinResult = iw.getNewJoinResult() +// if !ok { +// return false, joinResult +// } +// } +// } +// //if hasMatch { +// // task.matchKeyMap.Put(keyBuf, []byte{0}) +// //} +// return true, joinResult +//} + func (iw *innerWorker) constructDatumLookupKeys(task *lookUpJoinTask) ([][]types.Datum, error) { dLookUpKeys := make([][]types.Datum, 0, task.outerResult.NumRows()) keyBuf := make([]byte, 0, 64) @@ -553,36 +775,17 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa return nil } -func (iw *innerWorker) buildLookUpMap(task *lookUpJoinTask) error { - keyBuf := make([]byte, 0, 64) - valBuf := make([]byte, 8) - for i := 0; i < task.innerResult.NumChunks(); i++ { - chk := task.innerResult.GetChunk(i) - for j := 0; j < chk.NumRows(); j++ { - innerRow := chk.GetRow(j) - if iw.hasNullInJoinKey(innerRow) { - continue - } - - keyBuf = keyBuf[:0] - for _, keyCol := range iw.keyCols { - d := innerRow.GetDatum(keyCol, iw.rowTypes[keyCol]) - var err error - keyBuf, err = codec.EncodeKey(iw.ctx.GetSessionVars().StmtCtx, keyBuf, d) - if err != nil { - return errors.Trace(err) - } - } - rowPtr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)} - *(*chunk.RowPtr)(unsafe.Pointer(&valBuf[0])) = rowPtr - task.lookupMap.Put(keyBuf, valBuf) +func (iw *outerWorker) hasNullInJoinKey(row chunk.Row) bool { + for _, ordinal := range iw.keyCols { + if row.IsNull(ordinal) { + return true } } - return nil + return false } -func (iw *innerWorker) hasNullInJoinKey(row chunk.Row) bool { - for _, ordinal := range iw.keyCols { +func (ow *innerWorker) hasNullInJoinKey(row chunk.Row) bool { + for _, ordinal := range ow.keyCols { if row.IsNull(ordinal) { return true } @@ -595,7 +798,19 @@ func (e *IndexLookUpJoin) Close() error { if e.cancelFunc != nil { e.cancelFunc() } - e.workerWg.Wait() + + close(e.closeCh) + if e.joinResultCh != nil { + for range e.joinResultCh { + } + } + for i := range e.joinChkResourceCh { + close(e.joinChkResourceCh[i]) + for range e.joinChkResourceCh[i] { + } + } + e.joinChkResourceCh = nil + e.memTracker.Detach() e.memTracker = nil return errors.Trace(e.children[0].Close())