Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support index nested loop hash join #8661

Merged
merged 70 commits into from Sep 3, 2019
Merged
Changes from 1 commit
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
3ddc5cb
change hash to one by one match
yu34po Dec 17, 2018
25d22fb
add hash join
yu34po Dec 17, 2018
1329c93
carry task err to Next routine
yu34po Dec 18, 2018
0e75d4f
style change and bug fix
yu34po Dec 25, 2018
3f07d74
delete close channel
yu34po Dec 25, 2018
2383635
fomart
yu34po Dec 27, 2018
fede034
Merge branch 'master' into indexjoin
zz-jason Jan 5, 2019
c3d36f6
update some code
yu34po Jan 7, 2019
0a5a602
Merge branch 'indexjoin' of https://github.com/yu34po/tidb into index…
yu34po Jan 7, 2019
0a36649
del hash dedup func
yu34po Jan 7, 2019
4ddcf80
formate
yu34po Jan 7, 2019
e3277f4
change cols to items
yu34po Jan 7, 2019
9e3d3fe
fix nil case test bug
yu34po Jan 7, 2019
6a579e1
fix checkerror
yu34po Jan 8, 2019
62e6e22
change little
yu34po Jan 14, 2019
ce9cb2d
fmt code
yu34po Jan 16, 2019
a928cc2
format code
yu34po Jan 16, 2019
74b7c1a
Merge branch 'master' into indexjoin
yu34po Jan 17, 2019
e1de17d
change hash to one by one match
yu34po Dec 17, 2018
4dc605d
add hash join
yu34po Dec 17, 2018
059b30b
carry task err to Next routine
yu34po Dec 18, 2018
916a384
style change and bug fix
yu34po Dec 25, 2018
88784fa
delete close channel
yu34po Dec 25, 2018
f3dee6d
fomart
yu34po Dec 27, 2018
18e4de9
update some code
yu34po Jan 7, 2019
770bd38
del hash dedup func
yu34po Jan 7, 2019
ecae4af
formate
yu34po Jan 7, 2019
bea13ab
change cols to items
yu34po Jan 7, 2019
187fae7
fix nil case test bug
yu34po Jan 7, 2019
4a13576
fix checkerror
yu34po Jan 8, 2019
9eef4e3
change little
yu34po Jan 14, 2019
007a11a
fmt code
yu34po Jan 16, 2019
ef825e7
format code
yu34po Jan 16, 2019
b9b9f5c
fix Next func
yu34po Jan 17, 2019
f12d573
change next func
yu34po Jan 17, 2019
000d74d
add func
yu34po Jan 17, 2019
c2c30e6
fix merge conflict
yu34po Jul 8, 2019
f6af1cd
null test case
yu34po Jul 8, 2019
78d216a
lint
yu34po Jul 8, 2019
54c81b5
remove some vari
yu34po Jul 8, 2019
98ad68a
fmt
yu34po Jul 8, 2019
a7fef3f
fix ineffectual
yu34po Jul 8, 2019
f1c3977
Merge branch 'master' into indexjoin
yu34po Jul 8, 2019
581b697
code refine
XuHuaiyu Aug 20, 2019
e28987a
code refine
XuHuaiyu Aug 21, 2019
c30504d
test
XuHuaiyu Aug 21, 2019
eb0b643
Merge branch 'master' of https://github.com/pingcap/tidb into yu34po/…
XuHuaiyu Aug 21, 2019
c1f9a71
test
XuHuaiyu Aug 21, 2019
d4ac7e7
test
XuHuaiyu Aug 21, 2019
8c92cd7
test
XuHuaiyu Aug 23, 2019
fad84f4
fix ci
XuHuaiyu Aug 23, 2019
a79975f
code refine
XuHuaiyu Aug 23, 2019
f267865
Merge branch 'master' of https://github.com/pingcap/tidb into yu34po/…
XuHuaiyu Aug 23, 2019
5a25f27
code refine
XuHuaiyu Aug 23, 2019
3f7682c
address comment
XuHuaiyu Aug 26, 2019
b4a4f08
address comment
XuHuaiyu Aug 27, 2019
043dded
address comment
XuHuaiyu Sep 2, 2019
22e7215
address comment
XuHuaiyu Sep 2, 2019
e2f2b06
address comment
XuHuaiyu Sep 2, 2019
afde839
address comment
XuHuaiyu Sep 2, 2019
fd8c6ad
address comment
XuHuaiyu Sep 2, 2019
3ceeb1e
Merge branch 'master' into indexjoin
sre-bot Sep 2, 2019
c05a2f2
Merge branch 'master' into indexjoin
zz-jason Sep 2, 2019
041d625
Merge branch 'master' of https://github.com/pingcap/tidb into yu34po/…
XuHuaiyu Sep 2, 2019
7b642d4
fix ci
XuHuaiyu Sep 3, 2019
099e802
Merge branch 'master' of https://github.com/pingcap/tidb into yu34po/…
XuHuaiyu Sep 3, 2019
e21e18e
Merge branch 'indexjoin' of https://github.com/yu34po/tidb into yu34p…
XuHuaiyu Sep 3, 2019
841ce18
fix data race
XuHuaiyu Sep 3, 2019
4028a91
Merge branch 'master' of https://github.com/pingcap/tidb into yu34po/…
XuHuaiyu Sep 3, 2019
d266812
address comment
XuHuaiyu Sep 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 27 additions & 23 deletions executor/index_lookup_hash_join.go
Expand Up @@ -15,8 +15,9 @@ package executor

import (
"context"
"hash"
"hash/fnv"
"sync"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -53,12 +54,13 @@ type indexHashJoinOuterWorker struct {

type indexHashJoinInnerWorker struct {
innerWorker
matchedOuterPtrs [][]byte
matchedOuterPtrs []chunk.RowPtr
joiner joiner
joinChkResourceCh chan *chunk.Chunk
resultCh chan *indexHashJoinResult
taskCh <-chan *indexHashJoinTask
wg *sync.WaitGroup
joinKeyBuf []byte
}

type indexHashJoinResult struct {
Expand All @@ -78,6 +80,7 @@ const (
type indexHashJoinTask struct {
*lookUpJoinTask
outerRowStatus []outerRowStatusFlag
lookupMap *rowHashMap
err error
}

Expand Down Expand Up @@ -266,7 +269,8 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask,
joiner: e.joiner,
joinChkResourceCh: e.joinChkResourceCh[workerID],
resultCh: e.resultCh,
matchedOuterPtrs: make([][]byte, 0, 8),
matchedOuterPtrs: make([]chunk.RowPtr, 0, e.maxChunkSize),
joinKeyBuf: make([]byte, 1),
}
return iw
}
Expand All @@ -278,6 +282,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.
cancelFunc()
return
}
h := fnv.New64()
for {
select {
case <-ctx.Done():
Expand All @@ -291,7 +296,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.
joinResult.err = task.err
break
}
err := iw.handleTask(ctx, cancelFunc, task, joinResult)
err := iw.handleTask(ctx, cancelFunc, task, joinResult, h)
if err != nil {
joinResult.err = err
break
Expand Down Expand Up @@ -324,12 +329,10 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde
return joinResult, ok
}

func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask) {
var (
keyBuf, valBuf = make([]byte, 0, 64), make([]byte, 8)
rowIdx, numRows = 0, task.outerResult.NumRows()
err error
)
func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, h hash.Hash64) {
rowIdx, numRows := 0, task.outerResult.NumRows()
buf := make([]byte, 1)
task.lookupMap = newRowHashMap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newRowHashMap is added an argument here, https://github.com/pingcap/tidb/pull/11937/files#diff-4f7e05240341383641662778116ddf13R236
We should merge #11937, then fix it here.

OUTER:
for ; rowIdx < numRows; rowIdx++ {
if task.outerMatch != nil && !task.outerMatch[rowIdx] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to consider outerMatch when building the hash table?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unmatched outer rows can be called with onMissMatch,
we consider the outerMatch here to save the map.Get and map.Put overhead.

Expand All @@ -342,14 +345,15 @@ OUTER:
continue OUTER
}
}
keyBuf, err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, keyBuf[:0], row, iw.outerCtx.rowTypes, keyColIdx)
h.Reset()
err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.rowTypes, keyColIdx, buf)
if err != nil {
cancelFunc()
logutil.Logger(ctx).Error("indexHashJoinInnerWorker.buildHashTableForOuterResult failed", zap.Error(err))
return
}
*(*int)(unsafe.Pointer(&valBuf[0])) = rowIdx
task.lookupMap.Put(keyBuf, valBuf)
rowPtr := chunk.RowPtr{ChkIdx: 0, RowIdx: uint32(rowIdx)}
task.lookupMap.Put(h.Sum64(), rowPtr)
}
return
}
Expand All @@ -370,24 +374,24 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}
iw.wg.Done()
}

XuHuaiyu marked this conversation as resolved.
Show resolved Hide resolved
func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, joinResult *indexHashJoinResult) error {
func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64) error {
iw.wg = &sync.WaitGroup{}
iw.wg.Add(1)
// TODO(XuHuaiyu): we may always use the smaller side to build the hashtable.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to consider this TODO action? Since we only build hash table on a batch of records, no matter whether the records comes from the smaller or bigger side table, the total memory usage is limited.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

smaller means whether the outer batch is smaller or the inner result fetched according to the outer batch is smaller.

go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, cancelFunc, task) }, iw.handleHashJoinInnerWorkerPanic)
go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, cancelFunc, task, h) }, iw.handleHashJoinInnerWorkerPanic)
err := iw.fetchInnerResults(ctx, task.lookUpJoinTask)
if err != nil {
return err
}
iw.wg.Wait()
return iw.doJoin(ctx, task, joinResult)
return iw.doJoin(ctx, task, joinResult, h)
}

func (iw *indexHashJoinInnerWorker) doJoin(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult) error {
func (iw *indexHashJoinInnerWorker) doJoin(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64) error {
var ok bool
iter := chunk.NewIterator4List(task.innerResult)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
ok, joinResult = iw.joinMatchedInnerRow2Chunk(ctx, row, task, joinResult)
ok, joinResult = iw.joinMatchedInnerRow2Chunk(ctx, row, task, joinResult, h, iw.joinKeyBuf)
if !ok {
return errors.New("indexHashJoinInnerWorker.handleTask failed")
}
Expand All @@ -412,23 +416,23 @@ func (iw *indexHashJoinInnerWorker) doJoin(ctx context.Context, task *indexHashJ
}

func (iw *indexHashJoinInnerWorker) joinMatchedInnerRow2Chunk(ctx context.Context, innerRow chunk.Row, task *indexHashJoinTask,
joinResult *indexHashJoinResult) (bool, *indexHashJoinResult) {
joinResult *indexHashJoinResult, h hash.Hash64, buf []byte) (bool, *indexHashJoinResult) {
var err error
keyBuf := make([]byte, 0, 64)
keyBuf, err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, keyBuf, innerRow, iw.rowTypes, iw.keyCols)
h.Reset()
err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, innerRow, iw.rowTypes, iw.keyCols, buf)
if err != nil {
joinResult.err = err
return false, joinResult
}
iw.matchedOuterPtrs = task.lookupMap.Get(keyBuf, iw.matchedOuterPtrs[:0])
iw.matchedOuterPtrs = task.lookupMap.Get(h.Sum64())
if len(iw.matchedOuterPtrs) == 0 {
return true, joinResult
}
innerIter := chunk.NewIterator4Slice([]chunk.Row{innerRow})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It executes very slowly here...The tryToMatch function only match just one row. Can we change it by matching outerRows to innerRow?

var ok bool
for _, ptr := range iw.matchedOuterPtrs {
innerIter.Begin()
rowIdx := *(*int)(unsafe.Pointer(&ptr[0]))
rowIdx := int(ptr.RowIdx)
outerRow := task.outerResult.GetRow(rowIdx)
matched, isNull, err := iw.joiner.tryToMatch(outerRow, innerIter, joinResult.chk)
if err != nil {
Expand Down