From 029faf0025a5ce3fed20c4040db8dc8e9057e180 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Thu, 17 Oct 2019 14:48:24 +0800 Subject: [PATCH] executor: make IndexHashJoin support keeping the outer order (#12349) --- cmd/explaintest/r/topn_push_down.result | 10 +- cmd/explaintest/r/tpch.result | 88 ++++---- executor/builder.go | 5 +- executor/index_lookup_hash_join.go | 262 +++++++++++++++++++++--- executor/index_lookup_merge_join.go | 2 - executor/join_test.go | 35 ++++ planner/core/exhaust_physical_plans.go | 8 +- planner/core/physical_plans.go | 4 +- 8 files changed, 329 insertions(+), 85 deletions(-) diff --git a/cmd/explaintest/r/topn_push_down.result b/cmd/explaintest/r/topn_push_down.result index 1539f2f701391..bcbca7bc747cc 100644 --- a/cmd/explaintest/r/topn_push_down.result +++ b/cmd/explaintest/r/topn_push_down.result @@ -169,8 +169,8 @@ LIMIT 0, 5; id count task operator info Projection_13 0.00 root Column#47 └─Limit_19 0.00 root offset:0, count:5 - └─IndexJoin_104 0.00 root left outer join, inner:IndexReader_103, outer key:Column#1, inner key:Column#97 - ├─TopN_111 0.00 root Column#47:asc, offset:0, count:5 + └─IndexJoin_106 0.00 root left outer join, inner:IndexReader_105, outer key:Column#1, inner key:Column#97 + ├─TopN_115 0.00 root Column#47:asc, offset:0, count:5 │ └─IndexJoin_43 0.00 root inner join, inner:IndexLookUp_42, outer key:Column#1, inner key:Column#41 │ ├─IndexLookUp_86 0.00 root │ │ ├─Selection_84 0.00 cop[tikv] eq(Column#4, 18), eq(Column#5, 1) @@ -181,9 +181,9 @@ Projection_13 0.00 root Column#47 │ ├─IndexScan_39 1.25 cop[tikv] table:te, index:trade_id, range: decided by [eq(Column#41, Column#1)], keep order:false, stats:pseudo │ └─Selection_41 0.03 cop[tikv] ge(Column#47, 2018-04-23 00:00:00.000000), le(Column#47, 2018-04-23 23:59:59.000000) │ └─TableScan_40 1.25 cop[tikv] table:te, keep order:false, stats:pseudo - └─IndexReader_103 1.25 root index:Selection_102 - └─Selection_102 1.25 cop[tikv] not(isnull(Column#97)) - └─IndexScan_101 1.25 cop[tikv] table:p, index:relate_id, range: decided by [eq(Column#97, Column#1)], keep order:false, stats:pseudo + └─IndexReader_105 1.25 root index:Selection_104 + └─Selection_104 1.25 cop[tikv] not(isnull(Column#97)) + └─IndexScan_103 1.25 cop[tikv] table:p, index:relate_id, range: decided by [eq(Column#97, Column#1)], keep order:false, stats:pseudo desc select 1 as a from dual order by a limit 1; id count task operator info Projection_6 1.00 root 1 diff --git a/cmd/explaintest/r/tpch.result b/cmd/explaintest/r/tpch.result index 9eba9db193d58..2a01b34971816 100644 --- a/cmd/explaintest/r/tpch.result +++ b/cmd/explaintest/r/tpch.result @@ -252,15 +252,15 @@ id count task operator info Projection_14 10.00 root Column#18, Column#35, Column#13, Column#16 └─TopN_17 10.00 root Column#35:desc, Column#13:asc, offset:0, count:10 └─HashAgg_23 40252367.98 root group by:Column#49, Column#50, Column#51, funcs:sum(Column#45), firstrow(Column#46), firstrow(Column#47), firstrow(Column#48) - └─Projection_79 91515927.49 root mul(Column#23, minus(1, Column#24)), Column#13, Column#16, Column#18, Column#18, Column#13, Column#16 + └─Projection_81 91515927.49 root mul(Column#23, minus(1, Column#24)), Column#13, Column#16, Column#18, Column#18, Column#13, Column#16 └─IndexHashJoin_38 91515927.49 root inner join, inner:IndexLookUp_28, outer key:Column#9, inner key:Column#18 - ├─HashRightJoin_69 22592975.51 root inner join, inner:TableReader_75, equal:[eq(Column#1, Column#10)] - │ ├─TableReader_75 1498236.00 root data:Selection_74 - │ │ └─Selection_74 1498236.00 cop[tikv] eq(Column#7, "AUTOMOBILE") - │ │ └─TableScan_73 7500000.00 cop[tikv] table:customer, range:[-inf,+inf], keep order:false - │ └─TableReader_72 36870000.00 root data:Selection_71 - │ └─Selection_71 36870000.00 cop[tikv] lt(Column#13, 1995-03-13 00:00:00.000000) - │ └─TableScan_70 75000000.00 cop[tikv] table:orders, range:[-inf,+inf], keep order:false + ├─HashRightJoin_71 22592975.51 root inner join, inner:TableReader_77, equal:[eq(Column#1, Column#10)] + │ ├─TableReader_77 1498236.00 root data:Selection_76 + │ │ └─Selection_76 1498236.00 cop[tikv] eq(Column#7, "AUTOMOBILE") + │ │ └─TableScan_75 7500000.00 cop[tikv] table:customer, range:[-inf,+inf], keep order:false + │ └─TableReader_74 36870000.00 root data:Selection_73 + │ └─Selection_73 36870000.00 cop[tikv] lt(Column#13, 1995-03-13 00:00:00.000000) + │ └─TableScan_72 75000000.00 cop[tikv] table:orders, range:[-inf,+inf], keep order:false └─IndexLookUp_28 2.20 root ├─IndexScan_25 4.05 cop[tikv] table:lineitem, index:L_ORDERKEY, L_LINENUMBER, range: decided by [eq(Column#18, Column#9)], keep order:false └─Selection_27 2.20 cop[tikv] gt(Column#28, 1995-03-13 00:00:00.000000) @@ -596,24 +596,24 @@ Sort_25 2406.00 root Column#57:asc, Column#58:desc └─Projection_27 2406.00 root Column#53, Column#54, Column#56 └─HashAgg_30 2406.00 root group by:Column#53, Column#54, funcs:sum(Column#55), firstrow(Column#53), firstrow(Column#54) └─Projection_31 971049283.51 root Column#50, extract("YEAR", Column#44), minus(mul(Column#22, minus(1, Column#23)), mul(Column#37, Column#21)) - └─HashLeftJoin_44 971049283.51 root inner join, inner:TableReader_104, equal:[eq(Column#19, Column#35) eq(Column#18, Column#34)] - ├─HashLeftJoin_56 241379546.70 root inner join, inner:TableReader_102, equal:[eq(Column#17, Column#40)] - │ ├─HashLeftJoin_77 241379546.70 root inner join, inner:TableReader_100, equal:[eq(Column#18, Column#1)] - │ │ ├─HashRightJoin_80 300005811.00 root inner join, inner:HashRightJoin_91, equal:[eq(Column#10, Column#19)] - │ │ │ ├─HashRightJoin_91 500000.00 root inner join, inner:TableReader_95, equal:[eq(Column#49, Column#13)] - │ │ │ │ ├─TableReader_95 25.00 root data:TableScan_94 - │ │ │ │ │ └─TableScan_94 25.00 cop[tikv] table:nation, range:[-inf,+inf], keep order:false - │ │ │ │ └─TableReader_93 500000.00 root data:TableScan_92 - │ │ │ │ └─TableScan_92 500000.00 cop[tikv] table:supplier, range:[-inf,+inf], keep order:false - │ │ │ └─TableReader_97 300005811.00 root data:TableScan_96 - │ │ │ └─TableScan_96 300005811.00 cop[tikv] table:lineitem, range:[-inf,+inf], keep order:false - │ │ └─TableReader_100 8000000.00 root data:Selection_99 - │ │ └─Selection_99 8000000.00 cop[tikv] like(Column#2, "%dim%", 92) - │ │ └─TableScan_98 10000000.00 cop[tikv] table:part, range:[-inf,+inf], keep order:false - │ └─TableReader_102 75000000.00 root data:TableScan_101 - │ └─TableScan_101 75000000.00 cop[tikv] table:orders, range:[-inf,+inf], keep order:false - └─TableReader_104 40000000.00 root data:TableScan_103 - └─TableScan_103 40000000.00 cop[tikv] table:partsupp, range:[-inf,+inf], keep order:false + └─HashLeftJoin_44 971049283.51 root inner join, inner:TableReader_106, equal:[eq(Column#19, Column#35) eq(Column#18, Column#34)] + ├─HashLeftJoin_56 241379546.70 root inner join, inner:TableReader_104, equal:[eq(Column#17, Column#40)] + │ ├─HashLeftJoin_79 241379546.70 root inner join, inner:TableReader_102, equal:[eq(Column#18, Column#1)] + │ │ ├─HashRightJoin_82 300005811.00 root inner join, inner:HashRightJoin_93, equal:[eq(Column#10, Column#19)] + │ │ │ ├─HashRightJoin_93 500000.00 root inner join, inner:TableReader_97, equal:[eq(Column#49, Column#13)] + │ │ │ │ ├─TableReader_97 25.00 root data:TableScan_96 + │ │ │ │ │ └─TableScan_96 25.00 cop[tikv] table:nation, range:[-inf,+inf], keep order:false + │ │ │ │ └─TableReader_95 500000.00 root data:TableScan_94 + │ │ │ │ └─TableScan_94 500000.00 cop[tikv] table:supplier, range:[-inf,+inf], keep order:false + │ │ │ └─TableReader_99 300005811.00 root data:TableScan_98 + │ │ │ └─TableScan_98 300005811.00 cop[tikv] table:lineitem, range:[-inf,+inf], keep order:false + │ │ └─TableReader_102 8000000.00 root data:Selection_101 + │ │ └─Selection_101 8000000.00 cop[tikv] like(Column#2, "%dim%", 92) + │ │ └─TableScan_100 10000000.00 cop[tikv] table:part, range:[-inf,+inf], keep order:false + │ └─TableReader_104 75000000.00 root data:TableScan_103 + │ └─TableScan_103 75000000.00 cop[tikv] table:orders, range:[-inf,+inf], keep order:false + └─TableReader_106 40000000.00 root data:TableScan_105 + └─TableScan_105 40000000.00 cop[tikv] table:partsupp, range:[-inf,+inf], keep order:false /* Q10 Returned Item Reporting Query The query identifies customers who might be having problems with the parts that are shipped to them. @@ -1223,24 +1223,24 @@ Projection_25 1.00 root Column#2, Column#104 └─TopN_28 1.00 root Column#104:desc, Column#2:asc, offset:0, count:100 └─HashAgg_34 1.00 root group by:Column#2, funcs:count(1), firstrow(Column#2) └─IndexHashJoin_49 7828961.66 root anti semi join, inner:IndexLookUp_39, outer key:Column#8, inner key:Column#71, other cond:ne(Column#73, Column#10) - ├─IndexHashJoin_84 9786202.08 root semi join, inner:IndexLookUp_75, outer key:Column#8, inner key:Column#38, other cond:ne(Column#40, Column#1), ne(Column#40, Column#10) - │ ├─IndexMergeJoin_95 12232752.60 root inner join, inner:TableReader_93, outer key:Column#8, inner key:Column#25 - │ │ ├─HashRightJoin_101 12232752.60 root inner join, inner:HashRightJoin_114, equal:[eq(Column#1, Column#10)] - │ │ │ ├─HashRightJoin_114 20000.00 root inner join, inner:TableReader_119, equal:[eq(Column#34, Column#4)] - │ │ │ │ ├─TableReader_119 1.00 root data:Selection_118 - │ │ │ │ │ └─Selection_118 1.00 cop[tikv] eq(Column#35, "EGYPT") - │ │ │ │ │ └─TableScan_117 25.00 cop[tikv] table:nation, range:[-inf,+inf], keep order:false - │ │ │ │ └─TableReader_116 500000.00 root data:TableScan_115 - │ │ │ │ └─TableScan_115 500000.00 cop[tikv] table:supplier, range:[-inf,+inf], keep order:false - │ │ │ └─TableReader_122 240004648.80 root data:Selection_121 - │ │ │ └─Selection_121 240004648.80 cop[tikv] gt(Column#20, Column#19) - │ │ │ └─TableScan_120 300005811.00 cop[tikv] table:l1, range:[-inf,+inf], keep order:false - │ │ └─TableReader_93 0.80 root data:Selection_92 - │ │ └─Selection_92 0.80 cop[tikv] eq(Column#27, "F") - │ │ └─TableScan_91 1.00 cop[tikv] table:orders, range: decided by [Column#8], keep order:true - │ └─IndexLookUp_75 4.05 root - │ ├─IndexScan_73 4.05 cop[tikv] table:l2, index:L_ORDERKEY, L_LINENUMBER, range: decided by [eq(Column#38, Column#8)], keep order:false - │ └─TableScan_74 4.05 cop[tikv] table:l2, keep order:false + ├─IndexHashJoin_88 9786202.08 root semi join, inner:IndexLookUp_79, outer key:Column#8, inner key:Column#38, other cond:ne(Column#40, Column#1), ne(Column#40, Column#10) + │ ├─IndexMergeJoin_99 12232752.60 root inner join, inner:TableReader_97, outer key:Column#8, inner key:Column#25 + │ │ ├─HashRightJoin_105 12232752.60 root inner join, inner:HashRightJoin_118, equal:[eq(Column#1, Column#10)] + │ │ │ ├─HashRightJoin_118 20000.00 root inner join, inner:TableReader_123, equal:[eq(Column#34, Column#4)] + │ │ │ │ ├─TableReader_123 1.00 root data:Selection_122 + │ │ │ │ │ └─Selection_122 1.00 cop[tikv] eq(Column#35, "EGYPT") + │ │ │ │ │ └─TableScan_121 25.00 cop[tikv] table:nation, range:[-inf,+inf], keep order:false + │ │ │ │ └─TableReader_120 500000.00 root data:TableScan_119 + │ │ │ │ └─TableScan_119 500000.00 cop[tikv] table:supplier, range:[-inf,+inf], keep order:false + │ │ │ └─TableReader_126 240004648.80 root data:Selection_125 + │ │ │ └─Selection_125 240004648.80 cop[tikv] gt(Column#20, Column#19) + │ │ │ └─TableScan_124 300005811.00 cop[tikv] table:l1, range:[-inf,+inf], keep order:false + │ │ └─TableReader_97 0.80 root data:Selection_96 + │ │ └─Selection_96 0.80 cop[tikv] eq(Column#27, "F") + │ │ └─TableScan_95 1.00 cop[tikv] table:orders, range: decided by [Column#8], keep order:true + │ └─IndexLookUp_79 4.05 root + │ ├─IndexScan_77 4.05 cop[tikv] table:l2, index:L_ORDERKEY, L_LINENUMBER, range: decided by [eq(Column#38, Column#8)], keep order:false + │ └─TableScan_78 4.05 cop[tikv] table:l2, keep order:false └─IndexLookUp_39 3.24 root ├─IndexScan_36 4.05 cop[tikv] table:l3, index:L_ORDERKEY, L_LINENUMBER, range: decided by [eq(Column#71, Column#8)], keep order:false └─Selection_38 3.24 cop[tikv] gt(Column#83, Column#82) diff --git a/executor/builder.go b/executor/builder.go index cf9669028858f..9b8465268121a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1857,7 +1857,10 @@ func (b *executorBuilder) buildIndexLookUpMergeJoin(v *plannercore.PhysicalIndex func (b *executorBuilder) buildIndexNestedLoopHashJoin(v *plannercore.PhysicalIndexHashJoin) Executor { e := b.buildIndexLookUpJoin(&(v.PhysicalIndexJoin)).(*IndexLookUpJoin) - idxHash := &IndexNestedLoopHashJoin{IndexLookUpJoin: *e} + idxHash := &IndexNestedLoopHashJoin{ + IndexLookUpJoin: *e, + keepOuterOrder: v.KeepOuterOrder, + } concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency idxHash.joiners = make([]joiner, concurrency) for i := 0; i < concurrency; i++ { diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index bd4c2cd41104e..2750c2a5974ec 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -29,6 +29,16 @@ import ( "go.uber.org/zap" ) +// numResChkHold indicates the number of resource chunks that an inner worker +// holds at the same time. +// It's used in 2 cases individually: +// 1. IndexMergeJoin +// 2. IndexNestedLoopHashJoin: +// It's used when IndexNestedLoopHashJoin.keepOuterOrder is true. +// Otherwise, there will be at most `concurrency` resource chunks throughout +// the execution of IndexNestedLoopHashJoin. +const numResChkHold = 4 + // IndexNestedLoopHashJoin employs one outer worker and N inner workers to // execute concurrently. The output order is not promised. // @@ -45,15 +55,21 @@ type IndexNestedLoopHashJoin struct { IndexLookUpJoin resultCh chan *indexHashJoinResult joinChkResourceCh []chan *chunk.Chunk - // We build individual joiner for each inner worker when using chunk-based // execution, to avoid the concurrency of joiner.chk and joiner.selected. - joiners []joiner + joiners []joiner + keepOuterOrder bool + curTask *indexHashJoinTask + // taskCh is only used when `keepOuterOrder` is true. + taskCh chan *indexHashJoinTask } type indexHashJoinOuterWorker struct { outerWorker - innerCh chan *indexHashJoinTask + innerCh chan *indexHashJoinTask + keepOuterOrder bool + // taskCh is only used when the outer order needs to be promised. + taskCh chan *indexHashJoinTask } type indexHashJoinInnerWorker struct { @@ -61,11 +77,13 @@ type indexHashJoinInnerWorker struct { matchedOuterPtrs []chunk.RowPtr joiner joiner joinChkResourceCh chan *chunk.Chunk - resultCh chan *indexHashJoinResult - taskCh <-chan *indexHashJoinTask - wg *sync.WaitGroup - joinKeyBuf []byte - outerRowStatus []outerRowStatusFlag + // resultCh is valid only when indexNestedLoopHashJoin do not need to keep + // order. Otherwise, it will be nil. + resultCh chan *indexHashJoinResult + taskCh <-chan *indexHashJoinTask + wg *sync.WaitGroup + joinKeyBuf []byte + outerRowStatus []outerRowStatusFlag } type indexHashJoinResult struct { @@ -79,6 +97,15 @@ type indexHashJoinTask struct { outerRowStatus []outerRowStatusFlag lookupMap *rowHashMap err error + keepOuterOrder bool + // resultCh is only used when the outer order needs to be promised. + resultCh chan *indexHashJoinResult + // matchedInnerRowPtrs is only valid when the outer order needs to be + // promised. Otherwise, it will be nil. + // len(matchedInnerRowPtrs) equals to len(lookUpJoinTask.outerResult), and + // the elements of every row indicates the matched inner row ptrs of the + // corresponding outer row. + matchedInnerRowPtrs [][]chunk.RowPtr } // Open implements the IndexNestedLoopHashJoin Executor interface. @@ -121,16 +148,33 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { workerCtx, cancelFunc := context.WithCancel(ctx) e.cancelFunc = cancelFunc innerCh := make(chan *indexHashJoinTask, concurrency) + if e.keepOuterOrder { + e.taskCh = make(chan *indexHashJoinTask, concurrency) + } e.workerWg.Add(1) ow := e.newOuterWorker(innerCh) go util.WithRecovery(func() { ow.run(workerCtx, cancelFunc) }, e.finishJoinWorkers) - e.resultCh = make(chan *indexHashJoinResult, concurrency) + if !e.keepOuterOrder { + e.resultCh = make(chan *indexHashJoinResult, concurrency) + } else { + // When `keepOuterOrder` is true, each task holds their own `resultCh` + // individually, thus we do not need a global resultCh. + e.resultCh = nil + } e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency) for i := int(0); i < concurrency; i++ { - e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1) - e.joinChkResourceCh[i] <- newFirstChunk(e) + if !e.keepOuterOrder { + e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1) + e.joinChkResourceCh[i] <- newFirstChunk(e) + } else { + e.joinChkResourceCh[i] = make(chan *chunk.Chunk, numResChkHold) + for j := 0; j < numResChkHold; j++ { + e.joinChkResourceCh[i] <- newFirstChunk(e) + } + } } + e.workerWg.Add(concurrency) for i := int(0); i < concurrency; i++ { workerID := i @@ -141,19 +185,31 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { if r != nil { - e.resultCh <- &indexHashJoinResult{err: errors.Errorf("%v", r)} + logutil.BgLogger().Error("IndexNestedLoopHashJoin failed", zap.Error(errors.Errorf("%v", r))) + if e.cancelFunc != nil { + e.cancelFunc() + } } e.workerWg.Done() } func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() { e.workerWg.Wait() - close(e.resultCh) + if e.resultCh != nil { + close(e.resultCh) + } + if e.taskCh != nil { + close(e.taskCh) + } } // Next implements the IndexNestedLoopHashJoin Executor interface. func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error { req.Reset() + if e.keepOuterOrder { + return e.runInOrder(ctx, req) + } + // unordered run var ( result *indexHashJoinResult ok bool @@ -174,6 +230,50 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er return nil } +func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error { + var ( + result *indexHashJoinResult + ok bool + ) + for { + if e.isDryUpTasks(ctx) { + return nil + } + select { + case result, ok = <-e.curTask.resultCh: + if !ok { + e.curTask = nil + continue + } + if result.err != nil { + return result.err + } + case <-ctx.Done(): + return nil + } + req.SwapColumns(result.chk) + result.src <- result.chk + return nil + } +} + +// isDryUpTasks indicates whether all the tasks have been processed. +func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool { + if e.curTask != nil { + return false + } + var ok bool + select { + case e.curTask, ok = <-e.taskCh: + if !ok { + return true + } + case <-ctx.Done(): + return true + } + return false +} + // Close implements the IndexNestedLoopHashJoin Executor interface. func (e *IndexNestedLoopHashJoin) Close() error { if e.cancelFunc != nil { @@ -185,6 +285,11 @@ func (e *IndexNestedLoopHashJoin) Close() error { } e.resultCh = nil } + if e.taskCh != nil { + for range e.taskCh { + } + e.taskCh = nil + } for i := range e.joinChkResourceCh { close(e.joinChkResourceCh[i]) } @@ -207,6 +312,11 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context, cancelFunc context. if finished := ow.pushToChan(ctx, task, ow.innerCh); finished { return } + if ow.keepOuterOrder { + if finished := ow.pushToChan(ctx, task, ow.taskCh); finished { + return + } + } } } @@ -215,9 +325,20 @@ func (ow *indexHashJoinOuterWorker) buildTask(ctx context.Context) (*indexHashJo if task == nil || err != nil { return nil, err } + var ( + resultCh chan *indexHashJoinResult + matchedInnerRowPtrs [][]chunk.RowPtr + ) + if ow.keepOuterOrder { + resultCh = make(chan *indexHashJoinResult, numResChkHold) + matchedInnerRowPtrs = make([][]chunk.RowPtr, task.outerResult.NumRows()) + } return &indexHashJoinTask{ - lookUpJoinTask: task, - outerRowStatus: make([]outerRowStatusFlag, task.outerResult.NumRows()), + lookUpJoinTask: task, + outerRowStatus: make([]outerRowStatusFlag, task.outerResult.NumRows()), + keepOuterOrder: ow.keepOuterOrder, + resultCh: resultCh, + matchedInnerRowPtrs: matchedInnerRowPtrs, }, nil } @@ -242,7 +363,9 @@ func (e *IndexNestedLoopHashJoin) newOuterWorker(innerCh chan *indexHashJoinTask parentMemTracker: e.memTracker, lookup: &e.IndexLookUpJoin, }, - innerCh: innerCh, + innerCh: innerCh, + keepOuterOrder: e.keepOuterOrder, + taskCh: e.taskCh, } return ow } @@ -280,7 +403,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. cancelFunc() return } - h := fnv.New64() + h, resultCh := fnv.New64(), iw.resultCh for { select { case <-ctx.Done(): @@ -294,7 +417,10 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. joinResult.err = task.err break } - err := iw.handleTask(ctx, cancelFunc, task, joinResult, h) + if task.keepOuterOrder { + resultCh = task.resultCh + } + err := iw.handleTask(ctx, cancelFunc, task, joinResult, h, resultCh) if err != nil { joinResult.err = err break @@ -305,9 +431,12 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. logutil.Logger(ctx).Error("indexHashJoinInnerWorker.run failed", zap.Error(joinResult.err)) return } - if joinResult.chk != nil && joinResult.chk.NumRows() > 0 { + // When task.keepOuterOrder is TRUE(resultCh != iw.resultCh), the last + // joinResult will be checked when the a task has been processed, thus we do + // not need to check it here again. + if resultCh == iw.resultCh && joinResult.chk != nil && joinResult.chk.NumRows() > 0 { select { - case iw.resultCh <- joinResult: + case resultCh <- joinResult: case <-ctx.Done(): return } @@ -372,7 +501,7 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{} iw.wg.Done() } -func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64) error { +func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { iw.wg = &sync.WaitGroup{} iw.wg.Add(1) // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. @@ -382,16 +511,19 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, cancelFunc c return err } iw.wg.Wait() - return iw.doJoin(ctx, task, joinResult, h) + if !task.keepOuterOrder { + return iw.doJoinUnordered(ctx, task, joinResult, h, resultCh) + } + return iw.doJoinInOrder(ctx, task, joinResult, h, resultCh) } -func (iw *indexHashJoinInnerWorker) doJoin(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64) error { +func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { var ok bool iter := chunk.NewIterator4List(task.innerResult) for row := iter.Begin(); row != iter.End(); row = iter.Next() { ok, joinResult = iw.joinMatchedInnerRow2Chunk(ctx, row, task, joinResult, h, iw.joinKeyBuf) if !ok { - return errors.New("indexHashJoinInnerWorker.handleTask failed") + return errors.New("indexHashJoinInnerWorker.doJoinUnordered failed") } } for rowIdx, val := range task.outerRowStatus { @@ -401,12 +533,12 @@ func (iw *indexHashJoinInnerWorker) doJoin(ctx context.Context, task *indexHashJ iw.joiner.onMissMatch(val == outerRowHasNull, task.outerResult.GetRow(rowIdx), joinResult.chk) if joinResult.chk.IsFull() { select { - case iw.resultCh <- joinResult: + case resultCh <- joinResult: case <-ctx.Done(): } joinResult, ok = iw.getNewJoinResult(ctx) if !ok { - return errors.New("indexHashJoinInnerWorker.handleTask failed") + return errors.New("indexHashJoinInnerWorker.doJoinUnordered failed") } } } @@ -482,3 +614,81 @@ func (iw *indexHashJoinInnerWorker) joinMatchedInnerRow2Chunk(ctx context.Contex } return true, joinResult } + +func (iw *indexHashJoinInnerWorker) collectMatchedInnerPtrs4OuterRows(ctx context.Context, innerRow chunk.Row, innerRowPtr chunk.RowPtr, + task *indexHashJoinTask, h hash.Hash64, buf []byte) error { + _, matchedOuterRowIdx, err := iw.getMatchedOuterRows(innerRow, task, h, buf) + if err != nil { + return err + } + for _, outerRowIdx := range matchedOuterRowIdx { + task.matchedInnerRowPtrs[outerRowIdx] = append(task.matchedInnerRowPtrs[outerRowIdx], innerRowPtr) + } + return nil +} + +// doJoinInOrder follows the following steps: +// 1. collect all the matched inner row ptrs for every outer row +// 2. do the join work +// 2.1 collect all the matched inner rows using the collected ptrs for every outer row +// 2.2 call tryToMatchInners for every outer row +// 2.3 call onMissMatch when no inner rows are matched +func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) (err error) { + defer func() { + if err == nil && joinResult.chk != nil { + if joinResult.chk.NumRows() > 0 { + select { + case resultCh <- joinResult: + case <-ctx.Done(): + return + } + } else { + joinResult.src <- joinResult.chk + } + } + close(resultCh) + }() + for i, numChunks := 0, task.innerResult.NumChunks(); i < numChunks; i++ { + for j, chk := 0, task.innerResult.GetChunk(i); j < chk.NumRows(); j++ { + row := chk.GetRow(j) + ptr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)} + err = iw.collectMatchedInnerPtrs4OuterRows(ctx, row, ptr, task, h, iw.joinKeyBuf) + if err != nil { + return err + } + } + } + // TODO: matchedInnerRowPtrs and matchedInnerRows can be moved to inner worker. + matchedInnerRows := make([]chunk.Row, len(task.matchedInnerRowPtrs)) + var hasMatched, hasNull, ok bool + for outerRowIdx, innerRowPtrs := range task.matchedInnerRowPtrs { + matchedInnerRows, hasMatched, hasNull = matchedInnerRows[:0], false, false + outerRow := task.outerResult.GetRow(outerRowIdx) + for _, ptr := range innerRowPtrs { + matchedInnerRows = append(matchedInnerRows, task.innerResult.GetRow(ptr)) + } + iter := chunk.NewIterator4Slice(matchedInnerRows) + for iter.Begin(); iter.Current() != iter.End(); { + matched, isNull, err := iw.joiner.tryToMatchInners(outerRow, iter, joinResult.chk) + if err != nil { + return err + } + hasMatched, hasNull = matched || hasMatched, isNull || hasNull + if joinResult.chk.IsFull() { + select { + case resultCh <- joinResult: + case <-ctx.Done(): + return nil + } + joinResult, ok = iw.getNewJoinResult(ctx) + if !ok { + return errors.New("indexHashJoinInnerWorker.doJoinInOrder failed") + } + } + } + if !hasMatched { + iw.joiner.onMissMatch(hasNull, outerRow, joinResult.chk) + } + } + return nil +} diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index 19e5eab58b3ac..a223fc174e53b 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -32,8 +32,6 @@ import ( "github.com/pingcap/tidb/util/stringutil" ) -const numResChkHold = 4 - // IndexLookUpMergeJoin realizes IndexLookUpJoin by merge join // It preserves the order of the outer table and support batch lookup. // diff --git a/executor/join_test.go b/executor/join_test.go index 835cef9317ee5..ff39a751ad6f6 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -16,6 +16,7 @@ package executor_test import ( "context" "fmt" + "math/rand" "strings" "time" @@ -1019,6 +1020,40 @@ func (s *testSuite2) TestIndexLookupJoin(c *C) { )) } +func (s *testSuite2) TestIndexNestedLoopHashJoin(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_init_chunk_size=2") + tk.MustExec("set @@tidb_index_join_batch_size=10") + tk.MustExec("DROP TABLE IF EXISTS t, s") + tk.MustExec("create table t(pk int primary key, a int)") + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("insert into t values(%d, %d)", i, i)) + } + tk.MustExec("create table s(a int primary key)") + for i := 0; i < 100; i++ { + if rand.Float32() < 0.3 { + tk.MustExec(fmt.Sprintf("insert into s values(%d)", i)) + } else { + tk.MustExec(fmt.Sprintf("insert into s values(%d)", i*100)) + } + } + tk.MustExec("analyze table t") + tk.MustExec("analyze table s") + // Test IndexNestedLoopHashJoin keepOrder. + tk.MustQuery("explain select /*+ TIDB_INLJ(s) */ * from t left join s on t.a=s.a order by t.pk").Check(testkit.Rows( + "IndexHashJoin_28 100.00 root left outer join, inner:TableReader_22, outer key:Column#2, inner key:Column#3", + "├─TableReader_30 100.00 root data:TableScan_29", + "│ └─TableScan_29 100.00 cop[tikv] table:t, range:[-inf,+inf], keep order:true", + "└─TableReader_22 1.00 root data:TableScan_21", + " └─TableScan_21 1.00 cop[tikv] table:s, range: decided by [Column#2], keep order:false", + )) + rs := tk.MustQuery("select /*+ TIDB_INLJ(s) */ * from t left join s on t.a=s.a order by t.pk") + for i, row := range rs.Rows() { + c.Assert(row[0].(string), Equals, fmt.Sprintf("%d", i)) + } +} + func (s *testSuite2) TestMergejoinOrder(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index d56a65f7a543c..da08fd52046d9 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -437,17 +437,15 @@ func (p *LogicalJoin) constructIndexHashJoin( path *accessPath, compareFilters *ColWithCmpFuncManager, ) []PhysicalPlan { - // TODO(xuhuaiyu): support keep outer order for indexHashJoin. - if !prop.IsEmpty() { - return nil - } indexJoins := p.constructIndexJoin(prop, outerIdx, innerTask, ranges, keyOff2IdxOff, path, compareFilters) indexHashJoins := make([]PhysicalPlan, 0, len(indexJoins)) for _, plan := range indexJoins { join := plan.(*PhysicalIndexJoin) indexHashJoin := PhysicalIndexHashJoin{ PhysicalIndexJoin: *join, - keepOuterOrder: false, + // Prop is empty means that the parent operator does not need the + // join operator to provide any promise of the output order. + KeepOuterOrder: !prop.IsEmpty(), }.Init(p.ctx) indexHashJoins = append(indexHashJoins, indexHashJoin) } diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 4323672ff8224..53abd7215c433 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -309,9 +309,9 @@ type PhysicalIndexMergeJoin struct { // PhysicalIndexHashJoin represents the plan of index look up hash join. type PhysicalIndexHashJoin struct { PhysicalIndexJoin - // keepOuterOrder indicates whether keeping the output result order as the + // KeepOuterOrder indicates whether keeping the output result order as the // outer side. - keepOuterOrder bool + KeepOuterOrder bool } // PhysicalMergeJoin represents merge join implementation of LogicalJoin.