Skip to content

Commit

Permalink
planner: move invalidTask to task base pkg. (#53267)
Browse files Browse the repository at this point in the history
ref #51664, ref #52714
  • Loading branch information
AilinKid committed May 15, 2024
1 parent acdb6f5 commit 6aef624
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 72 deletions.
3 changes: 3 additions & 0 deletions pkg/planner/core/base/task_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ type Task interface {
// MemoryUsage returns the memory usage of current task.
MemoryUsage() int64
}

// InvalidTask is just a common invalid singleton instance initialized by core's empty RootTask.
var InvalidTask Task
2 changes: 2 additions & 0 deletions pkg/planner/core/core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package core
import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/planner/cardinality"
"github.com/pingcap/tidb/pkg/planner/core/base"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/planner/util/utilfuncp"
"github.com/pingcap/tidb/pkg/statistics"
Expand All @@ -38,6 +39,7 @@ func init() {
statistics.PrepareCols4MVIndex = PrepareIdxColsAndUnwrapArrayType

// For basic optimizer init.
base.InvalidTask = &RootTask{} // invalid if p is nil
expression.EvalSimpleAst = evalAstExpr
expression.BuildSimpleExpr = buildSimpleExpr
expression.DecodeKeyFromString = decodeKeyFromString
Expand Down
115 changes: 61 additions & 54 deletions pkg/planner/core/find_best_task.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func enforceProperty(p *property.PhysicalProperty, tsk base.Task, ctx base.PlanC
if p.TaskTp == property.MppTaskType {
mpp, ok := tsk.(*MppTask)
if !ok || mpp.Invalid() {
return invalidTask
return base.InvalidTask
}
if !p.IsSortItemAllForPartition() {
ctx.GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because operator `Sort` is not supported now.")
return invalidTask
return base.InvalidTask
}
tsk = mpp.enforceExchanger(p)
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,12 @@ func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...base.Task) base.Task {
lTask, lok := tasks[0].(*MppTask)
rTask, rok := tasks[1].(*MppTask)
if !lok || !rok {
return invalidTask
return base.InvalidTask
}
if p.mppShuffleJoin {
// protection check is case of some bugs
if len(lTask.hashCols) != len(rTask.hashCols) || len(lTask.hashCols) == 0 {
return invalidTask
return base.InvalidTask
}
lTask, rTask = p.convertPartitionKeysIfNeed(lTask, rTask)
}
Expand Down Expand Up @@ -1017,7 +1017,7 @@ func (p *PhysicalExpand) Attach2Task(tasks ...base.Task) base.Task {
mpp.p = p
return mpp
}
return invalidTask
return base.InvalidTask
}

// Attach2Task implements PhysicalPlan interface.
Expand Down Expand Up @@ -1052,11 +1052,11 @@ func (p *PhysicalUnionAll) attach2MppTasks(tasks ...base.Task) base.Task {
} else if root, ok := tk.(*RootTask); ok && root.IsEmpty() {
continue
} else {
return invalidTask
return base.InvalidTask
}
}
if len(childPlans) == 0 {
return invalidTask
return base.InvalidTask
}
p.SetChildren(childPlans...)
return t
Expand All @@ -1069,8 +1069,8 @@ func (p *PhysicalUnionAll) Attach2Task(tasks ...base.Task) base.Task {
if p.TP() == plancodec.TypePartitionUnion {
// In attach2MppTasks(), will attach PhysicalUnion to mppTask directly.
// But PartitionUnion cannot pushdown to tiflash, so here disable PartitionUnion pushdown to tiflash explicitly.
// For now, return invalidTask immediately, we can refine this by letting childTask of PartitionUnion convert to rootTask.
return invalidTask
// For now, return base.InvalidTask immediately, we can refine this by letting childTask of PartitionUnion convert to rootTask.
return base.InvalidTask
}
return p.attach2MppTasks(tasks...)
}
Expand Down Expand Up @@ -1758,7 +1758,7 @@ func (p *PhysicalStreamAgg) Attach2Task(tasks ...base.Task) base.Task {
storeType := cop.getStoreType()
// TiFlash doesn't support Stream Aggregation
if storeType == kv.TiFlash && len(p.GroupByItems) > 0 {
return invalidTask
return base.InvalidTask
}
partialAgg, finalAgg := p.newPartialAggregate(storeType, false)
if partialAgg != nil {
Expand Down Expand Up @@ -2122,7 +2122,7 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...base.Task) base.Task {
t := tasks[0].Copy()
mpp, ok := t.(*MppTask)
if !ok {
return invalidTask
return base.InvalidTask
}
switch p.MppRunMode {
case Mpp1Phase:
Expand All @@ -2139,7 +2139,7 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...base.Task) base.Task {
proj := p.convertAvgForMPP()
partialAgg, finalAgg := p.newPartialAggregate(kv.TiFlash, true)
if partialAgg == nil {
return invalidTask
return base.InvalidTask
}
attachPlan2Task(partialAgg, mpp)
partitionCols := p.MppPartitionCols
Expand All @@ -2149,7 +2149,7 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...base.Task) base.Task {
for _, expr := range items {
col, ok := expr.(*expression.Column)
if !ok {
return invalidTask
return base.InvalidTask
}
partitionCols = append(partitionCols, &property.MPPPartitionColumn{
Col: col,
Expand Down Expand Up @@ -2188,12 +2188,12 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...base.Task) base.Task {
proj := p.convertAvgForMPP()
partialAgg, finalAgg := p.newPartialAggregate(kv.TiFlash, true)
if finalAgg == nil {
return invalidTask
return base.InvalidTask
}

final, middle, partial, proj4Partial, err := p.adjust3StagePhaseAgg(partialAgg, finalAgg, canUse3StageAgg, groupingSets, mpp)
if err != nil {
return invalidTask
return base.InvalidTask
}

// partial agg proj would be null if one scalar agg cannot run in two-phase mode
Expand Down Expand Up @@ -2241,7 +2241,7 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...base.Task) base.Task {
attachPlan2Task(proj, newMpp)
return newMpp
default:
return invalidTask
return base.InvalidTask
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/task_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (t *MppTask) ConvertToRootTaskImpl(ctx base.PlanContext) *RootTask {
// It's only for TableScan. This is ensured by converting mppTask to rootTask just after TableScan is built,
// so no other operators are added into this mppTask.
logutil.BgLogger().Error("expect Selection or TableScan for mppTask.p", zap.String("mppTask.p", t.p.TP()))
return invalidTask
return base.InvalidTask.(*RootTask)
}
selectivity, _, err := cardinality.Selectivity(ctx, t.tblColHists, t.rootTaskConds, nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/util/utilfuncp/func_pointer_misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ var GetTaskPlanCost func(t base.Task, pop *optimizetrace.PhysicalOptimizeOp) (fl

// AddSelection will add a selection if necessary.
// This function is util function pointer that initialized by core functionality.
// todo: (3) arenatlx, remove this func pointer when inside referred LogicalSelection is moved out of core.
// todo: (4) arenatlx, remove this func pointer when inside referred LogicalSelection is moved out of core.
var AddSelection func(p base.LogicalPlan, child base.LogicalPlan, conditions []expression.Expression,
chIdx int, opt *optimizetrace.LogicalOptimizeOp)

0 comments on commit 6aef624

Please sign in to comment.