Skip to content

Commit

Permalink
executor: reorg codes for hashtable in HashJoinExec (#11937)
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway authored and XuHuaiyu committed Sep 3, 2019
1 parent f9d8541 commit 1ff620d
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 154 deletions.
41 changes: 24 additions & 17 deletions executor/benchmark_test.go
Expand Up @@ -154,9 +154,9 @@ func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource {
colData[i] = m.genColDatums(i)
}

m.genData = make([]*chunk.Chunk, (m.p.rows+m.initCap-1)/m.initCap)
m.genData = make([]*chunk.Chunk, (m.p.rows+m.maxChunkSize-1)/m.maxChunkSize)
for i := range m.genData {
m.genData[i] = chunk.NewChunkWithCapacity(retTypes(m), m.ctx.GetSessionVars().MaxChunkSize)
m.genData[i] = chunk.NewChunkWithCapacity(retTypes(m), m.maxChunkSize)
}

for i := 0; i < m.p.rows; i++ {
Expand Down Expand Up @@ -555,14 +555,15 @@ func prepare4Join(testCase *hashJoinTestCase, innerExec, outerExec Executor) *Ha
joinKeys = append(joinKeys, cols0[keyIdx])
}
e := &HashJoinExec{
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, stringutil.StringerStr("HashJoin"), innerExec, outerExec),
concurrency: uint(testCase.concurrency),
joinType: 0, // InnerJoin
isOuterJoin: false,
innerKeys: joinKeys,
outerKeys: joinKeys,
innerExec: innerExec,
outerExec: outerExec,
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, stringutil.StringerStr("HashJoin"), innerExec, outerExec),
concurrency: uint(testCase.concurrency),
joinType: 0, // InnerJoin
isOuterJoin: false,
innerKeys: joinKeys,
outerKeys: joinKeys,
innerExec: innerExec,
outerExec: outerExec,
innerEstCount: float64(testCase.rows),
}
defaultValues := make([]types.Datum, e.innerExec.Schema().Len())
lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec)
Expand Down Expand Up @@ -663,13 +664,13 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
innerResultCh := make(chan *chunk.Chunk, 1)
go func() {
for _, chk := range dataSource1.genData {
innerResultCh <- chk
}
close(innerResultCh)
}()
exec.rowContainer = nil
exec.memTracker = memory.NewTracker(exec.id, exec.ctx.GetSessionVars().MemQuotaHashJoin)
innerResultCh := make(chan *chunk.Chunk, len(dataSource1.chunks))
for _, chk := range dataSource1.chunks {
innerResultCh <- chk
}
close(innerResultCh)

b.StartTimer()
if err := exec.buildHashTableForList(innerResultCh); err != nil {
Expand All @@ -690,4 +691,10 @@ func BenchmarkBuildHashTableForList(b *testing.B) {
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})

cas.keyIdx = []int{0}
cas.rows = 10
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})
}
9 changes: 5 additions & 4 deletions executor/builder.go
Expand Up @@ -991,10 +991,11 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
}

e := &HashJoinExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), leftExec, rightExec),
concurrency: v.Concurrency,
joinType: v.JoinType,
isOuterJoin: v.JoinType.IsOuterJoin(),
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), leftExec, rightExec),
concurrency: v.Concurrency,
joinType: v.JoinType,
isOuterJoin: v.JoinType.IsOuterJoin(),
innerEstCount: v.Children()[v.InnerChildIdx].StatsCount(),
}

defaultValues := v.DefaultValues
Expand Down
203 changes: 185 additions & 18 deletions executor/hash_table.go
Expand Up @@ -14,31 +14,199 @@
package executor

import (
"hash"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
)

const (
// estCountMaxFactor defines the factor of estCountMax with maxChunkSize.
// estCountMax is maxChunkSize * estCountMaxFactor, the maximum threshold of estCount.
// if estCount is larger than estCountMax, set estCount to estCountMax.
// Set this threshold to prevent innerEstCount being too large and causing a performance and memory regression.
estCountMaxFactor = 10 * 1024

// estCountMinFactor defines the factor of estCountMin with maxChunkSize.
// estCountMin is maxChunkSize * estCountMinFactor, the minimum threshold of estCount.
// If estCount is smaller than estCountMin, set estCount to 0.
// Set this threshold to prevent innerEstCount being too small and causing a performance regression.
estCountMinFactor = 8

// estCountDivisor defines the divisor of innerEstCount.
// Set this divisor to prevent innerEstCount being too large and causing a performance regression.
estCountDivisor = 8
)

const maxEntrySliceLen = 8 * 1024
// hashContext keeps the needed hash context of a db table in hash join.
type hashContext struct {
allTypes []*types.FieldType
keyColIdx []int
h hash.Hash64
buf []byte
}

// hashRowContainer handles the rows and the hash map of a table.
// TODO: support spilling out to disk when memory is limited.
type hashRowContainer struct {
records *chunk.List
hashTable *rowHashMap

sc *stmtctx.StatementContext
hCtx *hashContext
}

func newHashRowContainer(sctx sessionctx.Context, estCount int, hCtx *hashContext, initList *chunk.List) *hashRowContainer {
maxChunkSize := sctx.GetSessionVars().MaxChunkSize
// The estCount from cost model is not quite accurate and we need
// to avoid that it's too large to consume redundant memory.
// So I invent a rough protection, firstly divide it by estCountDivisor
// then set a maximum threshold and a minimum threshold.
estCount /= estCountDivisor
if estCount > maxChunkSize*estCountMaxFactor {
estCount = maxChunkSize * estCountMaxFactor
}
if estCount < maxChunkSize*estCountMinFactor {
estCount = 0
}
c := &hashRowContainer{
records: initList,
hashTable: newRowHashMap(estCount),

sc: sctx.GetSessionVars().StmtCtx,
hCtx: hCtx,
}
return c
}

func (c *hashRowContainer) GetMemTracker() *memory.Tracker {
return c.records.GetMemTracker()
}

// GetMatchedRows get matched rows from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
func (c *hashRowContainer) GetMatchedRows(probeRow chunk.Row, hCtx *hashContext) (matched []chunk.Row, err error) {
hasNull, key, err := c.getJoinKeyFromChkRow(c.sc, probeRow, hCtx)
if err != nil || hasNull {
return
}
innerPtrs := c.hashTable.Get(key)
if len(innerPtrs) == 0 {
return
}
matched = make([]chunk.Row, 0, len(innerPtrs))
for _, ptr := range innerPtrs {
matchedRow := c.records.GetRow(ptr)
var ok bool
ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx)
if err != nil {
return
}
if !ok {
continue
}
matched = append(matched, matchedRow)
}
/* TODO(fengliyuan): add test case in this case
if len(matched) == 0 {
// noop
}
*/
return
}

// matchJoinKey checks if join keys of buildRow and probeRow are logically equal.
func (c *hashRowContainer) matchJoinKey(buildRow, probeRow chunk.Row, probeHCtx *hashContext) (ok bool, err error) {
return codec.EqualChunkRow(c.sc,
buildRow, c.hCtx.allTypes, c.hCtx.keyColIdx,
probeRow, probeHCtx.allTypes, probeHCtx.keyColIdx)
}

// PutChunk puts a chunk into hashRowContainer and build hash map. It's not thread-safe.
// key of hash table: hash value of key columns
// value of hash table: RowPtr of the corresponded row
func (c *hashRowContainer) PutChunk(chk *chunk.Chunk) error {
chkIdx := uint32(c.records.NumChunks())
c.records.Add(chk)
var (
hasNull bool
err error
key uint64
)
numRows := chk.NumRows()
for j := 0; j < numRows; j++ {
hasNull, key, err = c.getJoinKeyFromChkRow(c.sc, chk.GetRow(j), c.hCtx)
if err != nil {
return errors.Trace(err)
}
if hasNull {
continue
}
rowPtr := chunk.RowPtr{ChkIdx: chkIdx, RowIdx: uint32(j)}
c.hashTable.Put(key, rowPtr)
}
return nil
}

// getJoinKeyFromChkRow fetches join keys from row and calculate the hash value.
func (*hashRowContainer) getJoinKeyFromChkRow(sc *stmtctx.StatementContext, row chunk.Row, hCtx *hashContext) (hasNull bool, key uint64, err error) {
for _, i := range hCtx.keyColIdx {
if row.IsNull(i) {
return true, 0, nil
}
}
hCtx.h.Reset()
err = codec.HashChunkRow(sc, hCtx.h, row, hCtx.allTypes, hCtx.keyColIdx, hCtx.buf)
return false, hCtx.h.Sum64(), err
}

func (c hashRowContainer) Len() int {
return c.hashTable.Len()
}

const (
initialEntrySliceLen = 64
maxEntrySliceLen = 8 * 1024
)

type entry struct {
ptr chunk.RowPtr
next entryAddr
}

type entryStore struct {
slices [][]entry
sliceIdx uint32
sliceLen uint32
slices [][]entry
}

func (es *entryStore) init() {
es.slices = [][]entry{make([]entry, 0, initialEntrySliceLen)}
// Reserve the first empty entry, so entryAddr{} can represent nullEntryAddr.
reserved := es.put(entry{})
if reserved != nullEntryAddr {
panic("entryStore: first entry is not nullEntryAddr")
}
}

func (es *entryStore) put(e entry) entryAddr {
if es.sliceLen == maxEntrySliceLen {
es.slices = append(es.slices, make([]entry, 0, maxEntrySliceLen))
es.sliceLen = 0
es.sliceIdx++
sliceIdx := uint32(len(es.slices) - 1)
slice := es.slices[sliceIdx]
if len(slice) == cap(slice) {
size := cap(slice) * 2
if size >= maxEntrySliceLen {
size = maxEntrySliceLen
}
slice = make([]entry, 0, size)
es.slices = append(es.slices, slice)
sliceIdx++
}
addr := entryAddr{sliceIdx: es.sliceIdx, offset: es.sliceLen}
es.slices[es.sliceIdx] = append(es.slices[es.sliceIdx], e)
es.sliceLen++
addr := entryAddr{sliceIdx: sliceIdx, offset: uint32(len(slice))}
es.slices[sliceIdx] = append(slice, e)
return addr
}

Expand All @@ -56,20 +224,19 @@ var nullEntryAddr = entryAddr{}
// rowHashMap stores multiple rowPtr of rows for a given key with minimum GC overhead.
// A given key can store multiple values.
// It is not thread-safe, should only be used in one goroutine.
// TODO(fengliyuan): add unit test for this.
type rowHashMap struct {
entryStore entryStore
hashTable map[uint64]entryAddr
length int
}

// newRowHashMap creates a new rowHashMap.
func newRowHashMap() *rowHashMap {
// newRowHashMap creates a new rowHashMap. estCount means the estimated size of the hashMap.
// If unknown, set it to 0.
func newRowHashMap(estCount int) *rowHashMap {
m := new(rowHashMap)
// TODO(fengliyuan): initialize the size of map from the estimated row count for better performance.
m.hashTable = make(map[uint64]entryAddr)
m.entryStore.slices = [][]entry{make([]entry, 0, 64)}
// Reserve the first empty entry, so entryAddr{} can represent nullEntryAddr.
m.entryStore.put(entry{})
m.hashTable = make(map[uint64]entryAddr, estCount)
m.entryStore.init()
return m
}

Expand Down
50 changes: 50 additions & 0 deletions executor/hash_table_test.go
@@ -0,0 +1,50 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/chunk"
)

func (s *pkgTestSuite) TestRowHashMap(c *C) {
m := newRowHashMap(0)
m.Put(1, chunk.RowPtr{ChkIdx: 1, RowIdx: 1})
c.Check(m.Get(1), DeepEquals, []chunk.RowPtr{{ChkIdx: 1, RowIdx: 1}})

rawData := map[uint64][]chunk.RowPtr{}
for i := uint64(0); i < 10; i++ {
for j := uint64(0); j < initialEntrySliceLen*i; j++ {
rawData[i] = append(rawData[i], chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)})
}
}
m = newRowHashMap(0)
// put all rawData into m vertically
for j := uint64(0); j < initialEntrySliceLen*9; j++ {
for i := 9; i >= 0; i-- {
i := uint64(i)
if !(j < initialEntrySliceLen*i) {
break
}
m.Put(i, rawData[i][j])
}
}
// check
totalCount := 0
for i := uint64(0); i < 10; i++ {
totalCount += len(rawData[i])
c.Check(m.Get(i), DeepEquals, rawData[i])
}
c.Check(m.Len(), Equals, totalCount)
}

0 comments on commit 1ff620d

Please sign in to comment.