Skip to content

Commit

Permalink
planner: add aggregation hints TIDB_HASHAGG and TIDB_STREAMAGG (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
foreyes committed Sep 16, 2019
1 parent 2238434 commit 2dd2878
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 65 deletions.
16 changes: 8 additions & 8 deletions cmd/explaintest/r/topn_push_down.result
Expand Up @@ -240,19 +240,19 @@ explain select /*+ TIDB_SMJ(t1, t2) */ * from t t1 join t t2 on t1.a = t2.a limi
id count task operator info
Limit_11 5.00 root offset:0, count:5
└─MergeJoin_12 5.00 root inner join, left key:test.t1.a, right key:test.t2.a
├─IndexReader_14 4.00 root index:IndexScan_13
│ └─IndexScan_13 4.00 cop table:t1, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
└─IndexReader_16 4.00 root index:IndexScan_15
└─IndexScan_15 4.00 cop table:t2, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
├─IndexReader_15 4.00 root index:IndexScan_14
│ └─IndexScan_14 4.00 cop table:t1, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
└─IndexReader_17 4.00 root index:IndexScan_16
└─IndexScan_16 4.00 cop table:t2, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
explain select /*+ TIDB_SMJ(t1, t2) */ * from t t1 left join t t2 on t1.a = t2.a where t2.a is null limit 5;
id count task operator info
Limit_12 5.00 root offset:0, count:5
└─Selection_13 5.00 root isnull(test.t2.a)
└─MergeJoin_14 5.00 root left outer join, left key:test.t1.a, right key:test.t2.a
├─IndexReader_16 4.00 root index:IndexScan_15
│ └─IndexScan_15 4.00 cop table:t1, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
└─IndexReader_18 4.00 root index:IndexScan_17
└─IndexScan_17 4.00 cop table:t2, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
├─IndexReader_17 4.00 root index:IndexScan_16
│ └─IndexScan_16 4.00 cop table:t1, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
└─IndexReader_19 4.00 root index:IndexScan_18
└─IndexScan_18 4.00 cop table:t2, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
explain select /*+ TIDB_HJ(t1, t2) */ * from t t1 join t t2 on t1.a = t2.a limit 5;
id count task operator info
Limit_11 5.00 root offset:0, count:5
Expand Down
24 changes: 12 additions & 12 deletions executor/join_test.go
Expand Up @@ -143,13 +143,13 @@ func (s *testSuite2) TestJoin(c *C) {
tk.MustQuery("select /*+ TIDB_INLJ(t1) */ * from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "1 1 1 4", "3 3 3 4", "<nil> <nil> 4 5"))
tk.MustQuery("select /*+ TIDB_INLJ(t) */ avg(t.b) from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1.5000"))

// Test that two conflict hints will return error.
err := tk.ExecToErr("select /*+ TIDB_INLJ(t) TIDB_SMJ(t) */ * from t join t1 on t.a=t1.a")
c.Assert(err, NotNil)
err = tk.ExecToErr("select /*+ TIDB_INLJ(t) TIDB_HJ(t) */ from t join t1 on t.a=t1.a")
c.Assert(err, NotNil)
err = tk.ExecToErr("select /*+ TIDB_SMJ(t) TIDB_HJ(t) */ from t join t1 on t.a=t1.a")
c.Assert(err, NotNil)
// Test that two conflict hints will return warning.
tk.MustExec("select /*+ TIDB_INLJ(t) TIDB_SMJ(t) */ * from t join t1 on t.a=t1.a")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
tk.MustExec("select /*+ TIDB_INLJ(t) TIDB_HJ(t) */ * from t join t1 on t.a=t1.a")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
tk.MustExec("select /*+ TIDB_SMJ(t) TIDB_HJ(t) */ * from t join t1 on t.a=t1.a")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
Expand Down Expand Up @@ -888,11 +888,11 @@ func (s *testSuite2) TestMergejoinOrder(c *C) {
tk.MustExec("insert into t2 select a*100, b*100 from t1;")

tk.MustQuery("explain select /*+ TIDB_SMJ(t2) */ * from t1 left outer join t2 on t1.a=t2.a and t1.a!=3 order by t1.a;").Check(testkit.Rows(
"MergeJoin_15 10000.00 root left outer join, left key:test.t1.a, right key:test.t2.a, left cond:[ne(test.t1.a, 3)]",
"├─TableReader_11 10000.00 root data:TableScan_10",
"│ └─TableScan_10 10000.00 cop table:t1, range:[-inf,+inf], keep order:true, stats:pseudo",
"└─TableReader_13 6666.67 root data:TableScan_12",
" └─TableScan_12 6666.67 cop table:t2, range:[-inf,3), (3,+inf], keep order:true, stats:pseudo",
"MergeJoin_20 10000.00 root left outer join, left key:test.t1.a, right key:test.t2.a, left cond:[ne(test.t1.a, 3)]",
"├─TableReader_12 10000.00 root data:TableScan_11",
"│ └─TableScan_11 10000.00 cop table:t1, range:[-inf,+inf], keep order:true, stats:pseudo",
"└─TableReader_14 6666.67 root data:TableScan_13",
" └─TableScan_13 6666.67 cop table:t2, range:[-inf,3), (3,+inf], keep order:true, stats:pseudo",
))

tk.MustExec("set @@tidb_init_chunk_size=1")
Expand Down
76 changes: 66 additions & 10 deletions planner/core/exhaust_physical_plans.go
Expand Up @@ -118,7 +118,7 @@ func (p *PhysicalMergeJoin) tryToGetChildReqProp(prop *property.PhysicalProperty
}

func (p *LogicalJoin) getMergeJoin(prop *property.PhysicalProperty) []PhysicalPlan {
joins := make([]PhysicalPlan, 0, len(p.leftProperties))
joins := make([]PhysicalPlan, 0, len(p.leftProperties)+1)
// The leftProperties caches all the possible properties that are provided by its children.
for _, lhsChildProperty := range p.leftProperties {
offsets := getMaxSortPrefix(lhsChildProperty, p.LeftJoinKeys)
Expand Down Expand Up @@ -159,10 +159,10 @@ func (p *LogicalJoin) getMergeJoin(prop *property.PhysicalProperty) []PhysicalPl
joins = append(joins, mergeJoin)
}
}
// If TiDB_SMJ hint is existed && no join keys in children property,
// it should to enforce merge join.
if len(joins) == 0 && (p.preferJoinType&preferMergeJoin) > 0 {
return p.getEnforcedMergeJoin(prop)
// If TiDB_SMJ hint is existed, it should consider enforce merge join,
// because we can't trust lhsChildProperty completely.
if (p.preferJoinType & preferMergeJoin) > 0 {
joins = append(joins, p.getEnforcedMergeJoin(prop)...)
}

return joins
Expand Down Expand Up @@ -1171,11 +1171,36 @@ func (p *baseLogicalPlan) exhaustPhysicalPlans(_ *property.PhysicalProperty) []P
panic("baseLogicalPlan.exhaustPhysicalPlans() should never be called.")
}

func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan {
_, desc := prop.AllSameOrder()
enforcedAggs := make([]PhysicalPlan, 0, len(wholeTaskTypes))
childProp := &property.PhysicalProperty{
ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt),
Enforced: true,
Items: property.ItemsFromCols(la.groupByCols, desc),
}

for _, taskTp := range wholeTaskTypes {
copiedChildProperty := new(property.PhysicalProperty)
*copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field.
copiedChildProperty.TaskTp = taskTp

agg := basePhysicalAgg{
GroupByItems: la.GroupByItems,
AggFuncs: la.AggFuncs,
}.initForStream(la.ctx, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), copiedChildProperty)
agg.SetSchema(la.schema.Clone())
enforcedAggs = append(enforcedAggs, agg)
}
return enforcedAggs
}

func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan {
all, desc := prop.AllSameOrder()
if len(la.possibleProperties) == 0 || !all {
if !all {
return nil
}

for _, aggFunc := range la.AggFuncs {
if aggFunc.Mode == aggregation.FinalMode {
return nil
Expand All @@ -1186,7 +1211,7 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
return nil
}

streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(wholeTaskTypes)-1))
streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(wholeTaskTypes)-1)+len(wholeTaskTypes))
childProp := &property.PhysicalProperty{
ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt),
}
Expand Down Expand Up @@ -1217,6 +1242,11 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
streamAggs = append(streamAggs, agg)
}
}
// If STREAM_AGG hint is existed, it should consider enforce stream aggregation,
// because we can't trust possibleChildProperty completely.
if (la.preferAggType & preferStreamAgg) > 0 {
streamAggs = append(streamAggs, la.getEnforcedStreamAggs(prop)...)
}
return streamAggs
}

Expand All @@ -1237,9 +1267,35 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
}

func (la *LogicalAggregation) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
aggs := make([]PhysicalPlan, 0, len(la.possibleProperties)+1)
aggs = append(aggs, la.getHashAggs(prop)...)
aggs = append(aggs, la.getStreamAggs(prop)...)
preferHash := (la.preferAggType & preferHashAgg) > 0
preferStream := (la.preferAggType & preferStreamAgg) > 0
if preferHash && preferStream {
errMsg := "Optimizer aggregation hints are conflicted"
warning := ErrInternal.GenWithStack(errMsg)
la.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
la.preferAggType = 0
preferHash, preferStream = false, false
}

hashAggs := la.getHashAggs(prop)
if hashAggs != nil && preferHash {
return hashAggs
}

streamAggs := la.getStreamAggs(prop)
if streamAggs != nil && preferStream {
return streamAggs
}

if streamAggs == nil && preferStream {
errMsg := "Optimizer Hint TIDB_STREAMAGG is inapplicable"
warning := ErrInternal.GenWithStack(errMsg)
la.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}

aggs := make([]PhysicalPlan, 0, len(hashAggs)+len(streamAggs))
aggs = append(aggs, hashAggs...)
aggs = append(aggs, streamAggs...)
return aggs
}

Expand Down
5 changes: 1 addition & 4 deletions planner/core/expression_rewriter.go
Expand Up @@ -758,10 +758,7 @@ func (er *expressionRewriter) handleInSubquery(ctx context.Context, v *ast.Patte
join.attachOnConds(expression.SplitCNFItems(checkCondition))
// Set join hint for this join.
if er.b.TableHints() != nil {
er.err = join.setPreferredJoinType(er.b.TableHints())
if er.err != nil {
return v, true
}
join.setPreferredJoinType(er.b.TableHints())
}
er.p = join
} else {
Expand Down
65 changes: 35 additions & 30 deletions planner/core/logical_plan_builder.go
Expand Up @@ -54,6 +54,10 @@ const (
TiDBIndexNestedLoopJoin = "tidb_inlj"
// TiDBHashJoin is hint enforce hash join.
TiDBHashJoin = "tidb_hj"
// TiDBHashAgg is hint enforce hash aggregation.
TiDBHashAgg = "tidb_hashagg"
// TiDBStreamAgg is hint enforce stream aggregation.
TiDBStreamAgg = "tidb_streamagg"
)

const (
Expand Down Expand Up @@ -137,6 +141,9 @@ func (b *PlanBuilder) buildAggregation(ctx context.Context, p LogicalPlan, aggFu
plan4Agg.GroupByItems = gbyItems
plan4Agg.SetSchema(schema4Agg)
plan4Agg.collectGroupByColumns()
if hint := b.TableHints(); hint != nil {
plan4Agg.preferAggType = hint.preferAggType
}
return plan4Agg, aggIndexMap, nil
}

Expand Down Expand Up @@ -327,9 +334,9 @@ func extractTableAlias(p LogicalPlan) *model.CIStr {
return nil
}

func (p *LogicalJoin) setPreferredJoinType(hintInfo *tableHintInfo) error {
func (p *LogicalJoin) setPreferredJoinType(hintInfo *tableHintInfo) {
if hintInfo == nil {
return nil
return
}

lhsAlias := extractTableAlias(p.children[0])
Expand All @@ -355,9 +362,10 @@ func (p *LogicalJoin) setPreferredJoinType(hintInfo *tableHintInfo) error {
// If there're multiple join types and one of them is not index join hint,
// then there is a conflict of join types.
if bits.OnesCount(p.preferJoinType) > 1 && (p.preferJoinType^preferRightAsIndexInner^preferLeftAsIndexInner) > 0 {
return errors.New("Join hints are conflict, you can only specify one type of join")
errMsg := "Join hints are conflict, you can only specify one type of join"
warning := ErrInternal.GenWithStack(errMsg)
p.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
return nil
}

func resetNotNullFlag(schema *expression.Schema, start, end int) {
Expand Down Expand Up @@ -424,10 +432,7 @@ func (b *PlanBuilder) buildJoin(ctx context.Context, joinNode *ast.Join) (Logica
joinPlan.redundantSchema = expression.MergeSchema(lRedundant, rRedundant)

// Set preferred join algorithm if some join hints is specified by user.
err = joinPlan.setPreferredJoinType(b.TableHints())
if err != nil {
return nil, err
}
joinPlan.setPreferredJoinType(b.TableHints())

// "NATURAL JOIN" doesn't have "ON" or "USING" conditions.
//
Expand Down Expand Up @@ -1931,8 +1936,9 @@ func (b *PlanBuilder) unfoldWildStar(p LogicalPlan, selectFields []*ast.SelectFi
return resultList, nil
}

func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint) bool {
func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint) {
var sortMergeTables, INLJTables, hashJoinTables []hintTableInfo
var preferAggType uint
for _, hint := range hints {
switch hint.HintName.L {
case TiDBMergeJoin:
Expand All @@ -1941,19 +1947,20 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint) bool {
INLJTables = tableNames2HintTableInfo(hint.Tables)
case TiDBHashJoin:
hashJoinTables = tableNames2HintTableInfo(hint.Tables)
case TiDBHashAgg:
preferAggType |= preferHashAgg
case TiDBStreamAgg:
preferAggType |= preferStreamAgg
default:
// ignore hints that not implemented
}
}
if len(sortMergeTables)+len(INLJTables)+len(hashJoinTables) > 0 {
b.tableHintInfo = append(b.tableHintInfo, tableHintInfo{
sortMergeJoinTables: sortMergeTables,
indexNestedLoopJoinTables: INLJTables,
hashJoinTables: hashJoinTables,
})
return true
}
return false
b.tableHintInfo = append(b.tableHintInfo, tableHintInfo{
sortMergeJoinTables: sortMergeTables,
indexNestedLoopJoinTables: INLJTables,
hashJoinTables: hashJoinTables,
preferAggType: preferAggType,
})
}

func (b *PlanBuilder) popTableHints() {
Expand Down Expand Up @@ -1983,10 +1990,10 @@ func (b *PlanBuilder) TableHints() *tableHintInfo {
}

func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p LogicalPlan, err error) {
if b.pushTableHints(sel.TableHints) {
// table hints are only visible in the current SELECT statement.
defer b.popTableHints()
}
b.pushTableHints(sel.TableHints)
// table hints are only visible in the current SELECT statement.
defer b.popTableHints()

if sel.SelectStmtOpts != nil {
origin := b.inStraightJoin
b.inStraightJoin = sel.SelectStmtOpts.StraightJoin
Expand Down Expand Up @@ -2532,10 +2539,9 @@ func (b *PlanBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onConditio
}

func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) (Plan, error) {
if b.pushTableHints(update.TableHints) {
// table hints are only visible in the current UPDATE statement.
defer b.popTableHints()
}
b.pushTableHints(update.TableHints)
// table hints are only visible in the current UPDATE statement.
defer b.popTableHints()

// update subquery table should be forbidden
var asNameList []string
Expand Down Expand Up @@ -2752,10 +2758,9 @@ func extractTableAsNameForUpdate(p LogicalPlan, asNames map[*model.TableInfo][]*
}

func (b *PlanBuilder) buildDelete(ctx context.Context, delete *ast.DeleteStmt) (Plan, error) {
if b.pushTableHints(delete.TableHints) {
// table hints are only visible in the current DELETE statement.
defer b.popTableHints()
}
b.pushTableHints(delete.TableHints)
// table hints are only visible in the current DELETE statement.
defer b.popTableHints()

sel := &ast.SelectStmt{
Fields: &ast.FieldList{},
Expand Down
5 changes: 5 additions & 0 deletions planner/core/logical_plans.go
Expand Up @@ -98,6 +98,8 @@ const (
preferRightAsIndexInner
preferHashJoin
preferMergeJoin
preferHashAgg
preferStreamAgg
)

// LogicalJoin is the logical join plan.
Expand Down Expand Up @@ -247,6 +249,9 @@ type LogicalAggregation struct {
// groupByCols stores the columns that are group-by items.
groupByCols []*expression.Column

// preferAggType stores preferred aggregation algorithm type.
preferAggType uint

possibleProperties [][]*expression.Column
inputCount float64 // inputCount is the input count of this plan.
}
Expand Down

0 comments on commit 2dd2878

Please sign in to comment.