Skip to content

Commit

Permalink
executor: implement disk-based hash join (#12067)
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway committed Sep 24, 2019
1 parent 1fe9773 commit 1f92255
Show file tree
Hide file tree
Showing 17 changed files with 454 additions and 78 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Expand Up @@ -67,6 +67,7 @@ type Config struct {
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
OOMUseTmpStorage bool `toml:"oom-use-tmp-storage" json:"oom-use-tmp-storage"`
OOMAction string `toml:"oom-action" json:"oom-action"`
MemQuotaQuery int64 `toml:"mem-quota-query" json:"mem-quota-query"`
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
Expand Down Expand Up @@ -336,6 +337,7 @@ var defaultConf = Config{
SplitTable: true,
Lease: "45s",
TokenLimit: 1000,
OOMUseTmpStorage: true,
OOMAction: "log",
MemQuotaQuery: 32 << 30,
EnableStreaming: false,
Expand Down
11 changes: 7 additions & 4 deletions config/config.toml.example
Expand Up @@ -31,13 +31,16 @@ split-table = true
# The limit of concurrent executed sessions.
token-limit = 1000

# Only print a log when out of memory quota.
# Valid options: ["log", "cancel"]
oom-action = "log"

# Set the memory quota for a query in bytes. Default: 32GB
mem-quota-query = 34359738368

# Set to true to enable use of temporary disk for some executors when mem-quota-query is exceeded.
oom-use-tmp-storage = true

# What to do when mem-quota-query is exceeded and can not be spilled over to disk any more.
# Valid options: ["log", "cancel"]
oom-action = "log"

# Enable coprocessor streaming.
enable-streaming = false

Expand Down
88 changes: 65 additions & 23 deletions executor/benchmark_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"testing"

"github.com/pingcap/log"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap/zapcore"
)

var (
Expand Down Expand Up @@ -522,6 +524,7 @@ type hashJoinTestCase struct {
concurrency int
ctx sessionctx.Context
keyIdx []int
disk bool
}

func (tc hashJoinTestCase) columns() []*expression.Column {
Expand All @@ -532,8 +535,8 @@ func (tc hashJoinTestCase) columns() []*expression.Column {
}

func (tc hashJoinTestCase) String() string {
return fmt.Sprintf("(rows:%v, concurency:%v, joinKeyIdx: %v)",
tc.rows, tc.concurrency, tc.keyIdx)
return fmt.Sprintf("(rows:%v, concurency:%v, joinKeyIdx: %v, disk:%v)",
tc.rows, tc.concurrency, tc.keyIdx, tc.disk)
}

func defaultHashJoinTestCase() *hashJoinTestCase {
Expand Down Expand Up @@ -572,6 +575,13 @@ func prepare4Join(testCase *hashJoinTestCase, innerExec, outerExec Executor) *Ha
e.joiners[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues,
nil, lhsTypes, rhsTypes)
}
memLimit := int64(-1)
if testCase.disk {
memLimit = 1
}
t := memory.NewTracker(stringutil.StringerStr("root of prepare4Join"), memLimit)
t.SetActionOnExceed(nil)
e.ctx.GetSessionVars().StmtCtx.MemTracker = t
return e
}

Expand Down Expand Up @@ -620,10 +630,17 @@ func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
b.Fatal(err)
}
b.StopTimer()
if exec.rowContainer.alreadySpilled() != casTest.disk {
b.Fatal("wrong usage with disk")
}
}
}

func BenchmarkHashJoinExec(b *testing.B) {
lvl := log.GetLevel()
log.SetLevel(zapcore.ErrorLevel)
defer log.SetLevel(lvl)

b.ReportAllocs()
cas := defaultHashJoinTestCase()
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
Expand All @@ -634,6 +651,19 @@ func BenchmarkHashJoinExec(b *testing.B) {
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkHashJoinExecWithCase(b, cas)
})

cas.keyIdx = []int{0}
cas.disk = true
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkHashJoinExecWithCase(b, cas)
})

cas.keyIdx = []int{0}
cas.disk = true
cas.rows = 1000
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkHashJoinExecWithCase(b, cas)
})
}

func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
Expand All @@ -656,16 +686,16 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
dataSource2 := buildMockDataSource(opt)

dataSource1.prepareChunks()
exec := prepare4Join(casTest, dataSource1, dataSource2)
tmpCtx := context.Background()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
exec.rowContainer = nil
exec.memTracker = memory.NewTracker(exec.id, exec.ctx.GetSessionVars().MemQuotaHashJoin)
exec := prepare4Join(casTest, dataSource1, dataSource2)
tmpCtx := context.Background()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
exec.prepared = true

innerResultCh := make(chan *chunk.Chunk, len(dataSource1.chunks))
for _, chk := range dataSource1.chunks {
innerResultCh <- chk
Expand All @@ -676,25 +706,37 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
if err := exec.buildHashTableForList(innerResultCh); err != nil {
b.Fatal(err)
}

if err := exec.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
if exec.rowContainer.alreadySpilled() != casTest.disk {
b.Fatal("wrong usage with disk")
}
}
}

func BenchmarkBuildHashTableForList(b *testing.B) {
lvl := log.GetLevel()
log.SetLevel(zapcore.ErrorLevel)
defer log.SetLevel(lvl)

b.ReportAllocs()
cas := defaultHashJoinTestCase()
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})

cas.keyIdx = []int{0}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})

cas.keyIdx = []int{0}
cas.rows = 10
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})
rows := []int{10, 100000}
keyIdxs := [][]int{{0, 1}, {0}}
disks := []bool{false, true}
for _, row := range rows {
for _, keyIdx := range keyIdxs {
for _, disk := range disks {
cas.rows = row
cas.keyIdx = keyIdx
cas.disk = disk
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})
}
}
}
}

0 comments on commit 1f92255

Please sign in to comment.