Skip to content

Commit

Permalink
executor: refine executor code (pingcap#52959)
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored and terry1purcell committed May 17, 2024
1 parent 825d466 commit dd208b6
Show file tree
Hide file tree
Showing 34 changed files with 1,545 additions and 1,349 deletions.
20 changes: 3 additions & 17 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"checksum.go",
"compact_table.go",
"compiler.go",
"concurrent_map.go",
"coprocessor.go",
"cte.go",
"cte_table_reader.go",
Expand All @@ -34,12 +33,8 @@ go_library(
"explain.go",
"foreign_key.go",
"grant.go",
"hash_table.go",
"import_into.go",
"index_advise.go",
"index_lookup_hash_join.go",
"index_lookup_join.go",
"index_lookup_merge_join.go",
"index_merge_reader.go",
"infoschema_reader.go",
"insert.go",
Expand All @@ -48,13 +43,10 @@ go_library(
"inspection_profile.go",
"inspection_result.go",
"inspection_summary.go",
"join.go",
"joiner.go",
"load_data.go",
"load_stats.go",
"mem_reader.go",
"memtable_reader.go",
"merge_join.go",
"metrics_reader.go",
"mpp_gather.go",
"opt_rule_blacklist.go",
Expand Down Expand Up @@ -125,9 +117,11 @@ go_library(
"//pkg/executor/internal/testutil",
"//pkg/executor/internal/util",
"//pkg/executor/internal/vecgroupchecker",
"//pkg/executor/join",
"//pkg/executor/lockstats",
"//pkg/executor/metrics",
"//pkg/executor/sortexec",
"//pkg/executor/unionexec",
"//pkg/expression",
"//pkg/expression/aggregation",
"//pkg/expression/context",
Expand Down Expand Up @@ -189,7 +183,6 @@ go_library(
"//pkg/types/parser_driver",
"//pkg/util",
"//pkg/util/admin",
"//pkg/util/bitmap",
"//pkg/util/breakpoint",
"//pkg/util/channel",
"//pkg/util/chunk",
Expand Down Expand Up @@ -217,7 +210,6 @@ go_library(
"//pkg/util/logutil/consistency",
"//pkg/util/mathutil",
"//pkg/util/memory",
"//pkg/util/mvmap",
"//pkg/util/password-validation",
"//pkg/util/plancodec",
"//pkg/util/printer",
Expand Down Expand Up @@ -313,7 +305,6 @@ go_test(
"chunk_size_control_test.go",
"cluster_table_test.go",
"compact_table_test.go",
"concurrent_map_test.go",
"copr_cache_test.go",
"cte_test.go",
"delete_test.go",
Expand All @@ -327,24 +318,19 @@ go_test(
"explain_unit_test.go",
"explainfor_test.go",
"grant_test.go",
"hash_table_test.go",
"historical_stats_test.go",
"hot_regions_history_table_test.go",
"import_into_test.go",
"index_advise_test.go",
"index_lookup_join_test.go",
"index_lookup_merge_join_test.go",
"infoschema_cluster_table_test.go",
"infoschema_reader_internal_test.go",
"infoschema_reader_test.go",
"insert_test.go",
"inspection_result_test.go",
"inspection_summary_test.go",
"join_pkg_test.go",
"joiner_test.go",
"main_test.go",
"memtable_reader_test.go",
"merge_join_test.go",
"metrics_reader_test.go",
"parallel_apply_test.go",
"partition_table_test.go",
Expand Down Expand Up @@ -398,6 +384,7 @@ go_test(
"//pkg/executor/internal/builder",
"//pkg/executor/internal/exec",
"//pkg/executor/internal/testutil",
"//pkg/executor/join",
"//pkg/executor/sortexec",
"//pkg/expression",
"//pkg/expression/aggregation",
Expand Down Expand Up @@ -454,7 +441,6 @@ go_test(
"//pkg/util/disk",
"//pkg/util/execdetails",
"//pkg/util/gcutil",
"//pkg/util/hack",
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/mock",
Expand Down
169 changes: 85 additions & 84 deletions pkg/executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/aggregate"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/internal/testutil"
"github.com/pingcap/tidb/pkg/executor/join"
"github.com/pingcap/tidb/pkg/executor/sortexec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
Expand Down Expand Up @@ -657,7 +658,7 @@ func prepareResolveIndices(joinSchema, lSchema, rSchema *expression.Schema, join
return joinSchema
}

func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec exec.Executor) *HashJoinExec {
func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec exec.Executor) *join.HashJoinExec {
if testCase.useOuterToBuild {
innerExec, outerExec = outerExec, innerExec
}
Expand Down Expand Up @@ -689,41 +690,41 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec exec.Exec
joinKeysColIdx = append(joinKeysColIdx, testCase.keyIdx...)
probeKeysColIdx := make([]int, 0, len(testCase.keyIdx))
probeKeysColIdx = append(probeKeysColIdx, testCase.keyIdx...)
e := &HashJoinExec{
e := &join.HashJoinExec{
BaseExecutor: exec.NewBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec),
hashJoinCtx: &hashJoinCtx{
sessCtx: testCase.ctx,
joinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin
isOuterJoin: false,
useOuterToBuild: testCase.useOuterToBuild,
concurrency: uint(testCase.concurrency),
probeTypes: exec.RetTypes(outerExec),
buildTypes: exec.RetTypes(innerExec),
allocPool: chunk.NewEmptyAllocator(),
HashJoinCtx: &join.HashJoinCtx{
SessCtx: testCase.ctx,
JoinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin
IsOuterJoin: false,
UseOuterToBuild: testCase.useOuterToBuild,
Concurrency: uint(testCase.concurrency),
ProbeTypes: exec.RetTypes(outerExec),
BuildTypes: exec.RetTypes(innerExec),
ChunkAllocPool: chunk.NewEmptyAllocator(),
},
probeSideTupleFetcher: &probeSideTupleFetcher{
probeSideExec: outerExec,
ProbeSideTupleFetcher: &join.ProbeSideTupleFetcher{
ProbeSideExec: outerExec,
},
probeWorkers: make([]*probeWorker, testCase.concurrency),
buildWorker: &buildWorker{
buildKeyColIdx: joinKeysColIdx,
buildSideExec: innerExec,
ProbeWorkers: make([]*join.ProbeWorker, testCase.concurrency),
BuildWorker: &join.BuildWorker{
BuildKeyColIdx: joinKeysColIdx,
BuildSideExec: innerExec,
},
}

childrenUsedSchema := markChildrenUsedColsForTest(testCase.ctx, e.Schema(), e.Children(0).Schema(), e.Children(1).Schema())
defaultValues := make([]types.Datum, e.buildWorker.buildSideExec.Schema().Len())
defaultValues := make([]types.Datum, e.BuildWorker.BuildSideExec.Schema().Len())
lhsTypes, rhsTypes := exec.RetTypes(innerExec), exec.RetTypes(outerExec)
for i := uint(0); i < e.concurrency; i++ {
e.probeWorkers[i] = &probeWorker{
workerID: i,
hashJoinCtx: e.hashJoinCtx,
joiner: newJoiner(testCase.ctx, e.joinType, true, defaultValues,
for i := uint(0); i < e.Concurrency; i++ {
e.ProbeWorkers[i] = &join.ProbeWorker{
WorkerID: i,
HashJoinCtx: e.HashJoinCtx,
Joiner: join.NewJoiner(testCase.ctx, e.JoinType, true, defaultValues,
nil, lhsTypes, rhsTypes, childrenUsedSchema, false),
probeKeyColIdx: probeKeysColIdx,
ProbeKeyColIdx: probeKeysColIdx,
}
}
e.buildWorker.hashJoinCtx = e.hashJoinCtx
e.BuildWorker.HashJoinCtx = e.HashJoinCtx
memLimit := int64(-1)
if testCase.disk {
memLimit = 1
Expand Down Expand Up @@ -832,7 +833,7 @@ func benchmarkHashJoinExec(b *testing.B, casTest *hashJoinTestCase, opt1, opt2 *

if testResult {
time.Sleep(200 * time.Millisecond)
if spilled := executor.rowContainer.alreadySpilledSafeForTest(); spilled != casTest.disk {
if spilled := executor.RowContainer.AlreadySpilledSafeForTest(); spilled != casTest.disk {
b.Fatal("wrong usage with disk:", spilled, casTest.disk)
}
}
Expand Down Expand Up @@ -1021,7 +1022,7 @@ func benchmarkBuildHashTable(b *testing.B, casTest *hashJoinTestCase, dataSource
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
exec.prepared = true
exec.Prepared = true

innerResultCh := make(chan *chunk.Chunk, len(dataSource1.Chunks))
for _, chk := range dataSource1.Chunks {
Expand All @@ -1030,13 +1031,13 @@ func benchmarkBuildHashTable(b *testing.B, casTest *hashJoinTestCase, dataSource
close(innerResultCh)

b.StartTimer()
if err := exec.buildWorker.buildHashTableForList(innerResultCh); err != nil {
if err := exec.BuildWorker.BuildHashTableForList(innerResultCh); err != nil {
b.Fatal(err)
}

if testResult {
time.Sleep(200 * time.Millisecond)
if exec.rowContainer.alreadySpilledSafeForTest() != casTest.disk {
if exec.RowContainer.AlreadySpilledSafeForTest() != casTest.disk {
b.Fatal("wrong usage with disk")
}
}
Expand Down Expand Up @@ -1165,27 +1166,27 @@ func prepare4IndexInnerHashJoin(tc *IndexJoinTestCase, outerDS *testutil.MockDat
return nil, err
}

e := &IndexLookUpJoin{
e := &join.IndexLookUpJoin{
BaseExecutor: exec.NewBaseExecutor(tc.Ctx, joinSchema, 1, outerDS),
outerCtx: outerCtx{
rowTypes: leftTypes,
keyCols: tc.OuterJoinKeyIdx,
hashCols: tc.OuterHashKeyIdx,
OuterCtx: join.OuterCtx{
RowTypes: leftTypes,
KeyCols: tc.OuterJoinKeyIdx,
HashCols: tc.OuterHashKeyIdx,
},
innerCtx: innerCtx{
readerBuilder: readerBuilder,
rowTypes: rightTypes,
colLens: colLens,
keyCols: tc.InnerJoinKeyIdx,
hashCols: tc.InnerHashKeyIdx,
InnerCtx: join.InnerCtx{
ReaderBuilder: readerBuilder,
RowTypes: rightTypes,
ColLens: colLens,
KeyCols: tc.InnerJoinKeyIdx,
HashCols: tc.InnerHashKeyIdx,
},
workerWg: new(sync.WaitGroup),
joiner: newJoiner(tc.Ctx, 0, false, defaultValues, nil, leftTypes, rightTypes, nil, false),
isOuterJoin: false,
keyOff2IdxOff: keyOff2IdxOff,
lastColHelper: nil,
WorkerWg: new(sync.WaitGroup),
Joiner: join.NewJoiner(tc.Ctx, 0, false, defaultValues, nil, leftTypes, rightTypes, nil, false),
IsOuterJoin: false,
KeyOff2IdxOff: keyOff2IdxOff,
LastColHelper: nil,
}
e.joinResult = exec.NewFirstChunk(e)
e.JoinResult = exec.NewFirstChunk(e)
return e, nil
}

Expand All @@ -1194,11 +1195,11 @@ func prepare4IndexOuterHashJoin(tc *IndexJoinTestCase, outerDS *testutil.MockDat
if err != nil {
return nil, err
}
idxHash := &IndexNestedLoopHashJoin{IndexLookUpJoin: *e.(*IndexLookUpJoin)}
idxHash := &join.IndexNestedLoopHashJoin{IndexLookUpJoin: *e.(*join.IndexLookUpJoin)}
concurrency := tc.Concurrency
idxHash.joiners = make([]joiner, concurrency)
idxHash.Joiners = make([]join.Joiner, concurrency)
for i := 0; i < concurrency; i++ {
idxHash.joiners[i] = e.(*IndexLookUpJoin).joiner.Clone()
idxHash.Joiners[i] = e.(*join.IndexLookUpJoin).Joiner.Clone()
}
return idxHash, nil
}
Expand Down Expand Up @@ -1239,34 +1240,34 @@ func prepare4IndexMergeJoin(tc *IndexJoinTestCase, outerDS *testutil.MockDataSou
return nil, err
}

e := &IndexLookUpMergeJoin{
e := &join.IndexLookUpMergeJoin{
BaseExecutor: exec.NewBaseExecutor(tc.Ctx, joinSchema, 2, outerDS),
outerMergeCtx: outerMergeCtx{
rowTypes: leftTypes,
keyCols: tc.OuterJoinKeyIdx,
joinKeys: outerJoinKeys,
needOuterSort: tc.NeedOuterSort,
compareFuncs: outerCompareFuncs,
OuterMergeCtx: join.OuterMergeCtx{
RowTypes: leftTypes,
KeyCols: tc.OuterJoinKeyIdx,
JoinKeys: outerJoinKeys,
NeedOuterSort: tc.NeedOuterSort,
CompareFuncs: outerCompareFuncs,
},
innerMergeCtx: innerMergeCtx{
readerBuilder: readerBuilder,
rowTypes: rightTypes,
joinKeys: innerJoinKeys,
colLens: colLens,
keyCols: tc.InnerJoinKeyIdx,
compareFuncs: compareFuncs,
InnerMergeCtx: join.InnerMergeCtx{
ReaderBuilder: readerBuilder,
RowTypes: rightTypes,
JoinKeys: innerJoinKeys,
ColLens: colLens,
KeyCols: tc.InnerJoinKeyIdx,
CompareFuncs: compareFuncs,
},
workerWg: new(sync.WaitGroup),
isOuterJoin: false,
keyOff2IdxOff: keyOff2IdxOff,
lastColHelper: nil,
WorkerWg: new(sync.WaitGroup),
IsOuterJoin: false,
KeyOff2IdxOff: keyOff2IdxOff,
LastColHelper: nil,
}
concurrency := e.Ctx().GetSessionVars().IndexLookupJoinConcurrency()
joiners := make([]joiner, concurrency)
joiners := make([]join.Joiner, concurrency)
for i := 0; i < concurrency; i++ {
joiners[i] = newJoiner(tc.Ctx, 0, false, defaultValues, nil, leftTypes, rightTypes, nil, false)
joiners[i] = join.NewJoiner(tc.Ctx, 0, false, defaultValues, nil, leftTypes, rightTypes, nil, false)
}
e.joiners = joiners
e.Joiners = joiners
return e, nil
}

Expand Down Expand Up @@ -1365,13 +1366,13 @@ type mergeJoinTestCase struct {
}

func prepareMergeJoinExec(tc *mergeJoinTestCase, joinSchema *expression.Schema, leftExec, rightExec exec.Executor, defaultValues []types.Datum,
compareFuncs []expression.CompareFunc, innerJoinKeys []*expression.Column, outerJoinKeys []*expression.Column) *MergeJoinExec {
compareFuncs []expression.CompareFunc, innerJoinKeys []*expression.Column, outerJoinKeys []*expression.Column) *join.MergeJoinExec {
// only benchmark inner join
mergeJoinExec := &MergeJoinExec{
stmtCtx: tc.Ctx.GetSessionVars().StmtCtx,
mergeJoinExec := &join.MergeJoinExec{
StmtCtx: tc.Ctx.GetSessionVars().StmtCtx,
BaseExecutor: exec.NewBaseExecutor(tc.Ctx, joinSchema, 3, leftExec, rightExec),
compareFuncs: compareFuncs,
isOuterJoin: false,
CompareFuncs: compareFuncs,
IsOuterJoin: false,
}

var usedIdx [][]int
Expand All @@ -1388,7 +1389,7 @@ func prepareMergeJoinExec(tc *mergeJoinTestCase, joinSchema *expression.Schema,
}
}

mergeJoinExec.joiner = newJoiner(
mergeJoinExec.Joiner = join.NewJoiner(
tc.Ctx,
0,
false,
Expand All @@ -1400,16 +1401,16 @@ func prepareMergeJoinExec(tc *mergeJoinTestCase, joinSchema *expression.Schema,
false,
)

mergeJoinExec.innerTable = &mergeJoinTable{
isInner: true,
childIndex: 1,
joinKeys: innerJoinKeys,
mergeJoinExec.InnerTable = &join.MergeJoinTable{
IsInner: true,
ChildIndex: 1,
JoinKeys: innerJoinKeys,
}

mergeJoinExec.outerTable = &mergeJoinTable{
childIndex: 0,
filters: nil,
joinKeys: outerJoinKeys,
mergeJoinExec.OuterTable = &join.MergeJoinTable{
ChildIndex: 0,
Filters: nil,
JoinKeys: outerJoinKeys,
}

return mergeJoinExec
Expand Down

0 comments on commit dd208b6

Please sign in to comment.