Skip to content

Commit

Permalink
initialize the size of rowHashMap from the estimated row count.
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway committed Aug 30, 2019
1 parent 63a0c3f commit a086d7f
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 23 deletions.
17 changes: 9 additions & 8 deletions executor/benchmark_test.go
Expand Up @@ -555,14 +555,15 @@ func prepare4Join(testCase *hashJoinTestCase, innerExec, outerExec Executor) *Ha
joinKeys = append(joinKeys, cols0[keyIdx])
}
e := &HashJoinExec{
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, stringutil.StringerStr("HashJoin"), innerExec, outerExec),
concurrency: uint(testCase.concurrency),
joinType: 0, // InnerJoin
isOuterJoin: false,
innerKeys: joinKeys,
outerKeys: joinKeys,
innerExec: innerExec,
outerExec: outerExec,
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, stringutil.StringerStr("HashJoin"), innerExec, outerExec),
concurrency: uint(testCase.concurrency),
joinType: 0, // InnerJoin
isOuterJoin: false,
innerKeys: joinKeys,
outerKeys: joinKeys,
innerExec: innerExec,
outerExec: outerExec,
innerStatsCount: float64(testCase.rows),
}
defaultValues := make([]types.Datum, e.innerExec.Schema().Len())
lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec)
Expand Down
9 changes: 5 additions & 4 deletions executor/builder.go
Expand Up @@ -991,10 +991,11 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
}

e := &HashJoinExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), leftExec, rightExec),
concurrency: v.Concurrency,
joinType: v.JoinType,
isOuterJoin: v.JoinType.IsOuterJoin(),
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), leftExec, rightExec),
concurrency: v.Concurrency,
joinType: v.JoinType,
isOuterJoin: v.JoinType.IsOuterJoin(),
innerStatsCount: v.Children()[v.InnerChildIdx].StatsCount(),
}

defaultValues := v.DefaultValues
Expand Down
13 changes: 8 additions & 5 deletions executor/hash_table.go
Expand Up @@ -38,11 +38,11 @@ type hashRowContainer struct {
}

func newHashRowContainer(
sc *stmtctx.StatementContext,
sc *stmtctx.StatementContext, statCount int,
allTypes []*types.FieldType, keyColIdx []int, initCap, maxChunkSize int) *hashRowContainer {

c := &hashRowContainer{
hashTable: newRowHashMap(),
hashTable: newRowHashMapWithStatCount(statCount),
sc: sc,
allTypes: allTypes,
keyColIdx: keyColIdx,
Expand Down Expand Up @@ -207,14 +207,17 @@ type rowHashMap struct {
}

// newRowHashMap creates a new rowHashMap.
func newRowHashMap() *rowHashMap {
func newRowHashMapWithStatCount(statCount int) *rowHashMap {
m := new(rowHashMap)
// TODO(fengliyuan): initialize the size of map from the estimated row count for better performance.
m.hashTable = make(map[uint64]entryAddr)
m.hashTable = make(map[uint64]entryAddr, statCount)
m.entryStore.init()
return m
}

func newRowHashMap() *rowHashMap {
return newRowHashMapWithStatCount(0)
}

// Put puts the key/rowPtr pairs to the rowHashMap, multiple rowPtrs are stored in a list.
func (m *rowHashMap) Put(hashKey uint64, rowPtr chunk.RowPtr) {
oldEntryAddr := m.hashTable[hashKey]
Expand Down
28 changes: 22 additions & 6 deletions executor/join.go
Expand Up @@ -40,11 +40,12 @@ var (
type HashJoinExec struct {
baseExecutor

outerExec Executor
innerExec Executor
outerFilter expression.CNFExprs
outerKeys []*expression.Column
innerKeys []*expression.Column
outerExec Executor
innerExec Executor
innerStatsCount float64
outerFilter expression.CNFExprs
outerKeys []*expression.Column
innerKeys []*expression.Column

// concurrency is the number of partition, build and join workers.
concurrency uint
Expand Down Expand Up @@ -492,13 +493,28 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) {
}
}

const (
// statCountMaxFactor defines the factor of maxStatCount with maxChunkSize.
// statCountMax is maxChunkSize * maxStatCountFactor.
// Set this threshold to prevent innerStatsCount being too large and causing a performance regression.
statCountMaxFactor = 10 * 1024

// statCountDivisor defines the divisor of innerStatsCount.
// Set this divisor to prevent innerStatsCount being too large and causing a performance regression.
statCountDivisor = 8
)

// buildHashTableForList builds hash table from `list`.
func (e *HashJoinExec) buildHashTableForList(innerResultCh <-chan *chunk.Chunk) error {
innerKeyColIdx := make([]int, len(e.innerKeys))
for i := range e.innerKeys {
innerKeyColIdx[i] = e.innerKeys[i].Index
}
e.rowContainer = newHashRowContainer(e.ctx.GetSessionVars().StmtCtx,
statCount := int(e.innerStatsCount / statCountDivisor)
if statCount > e.maxChunkSize*statCountMaxFactor {
statCount = e.maxChunkSize * statCountMaxFactor
}
e.rowContainer = newHashRowContainer(e.ctx.GetSessionVars().StmtCtx, statCount,
e.innerExec.base().retFieldTypes, innerKeyColIdx,
e.initCap, e.maxChunkSize)
e.rowContainer.GetMemTracker().AttachTo(e.memTracker)
Expand Down

0 comments on commit a086d7f

Please sign in to comment.