Skip to content

Commit

Permalink
planner: move base plan related output of core pkg and make it well-p…
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored and 3AceShowHand committed Apr 16, 2024
1 parent 9d438fb commit 0a2bb55
Show file tree
Hide file tree
Showing 113 changed files with 1,226 additions and 1,081 deletions.
2 changes: 2 additions & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ go_library(
"//pkg/planner/cardinality",
"//pkg/planner/context",
"//pkg/planner/core",
"//pkg/planner/core/base",
"//pkg/planner/util",
"//pkg/planner/util/fixcontrol",
"//pkg/plugin",
Expand Down Expand Up @@ -410,6 +411,7 @@ go_test(
"//pkg/parser/terror",
"//pkg/planner",
"//pkg/planner/core",
"//pkg/planner/core/base",
"//pkg/planner/property",
"//pkg/planner/util",
"//pkg/server",
Expand Down
11 changes: 6 additions & 5 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/planner"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/plugin"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/sessionstates"
Expand Down Expand Up @@ -221,7 +222,7 @@ type ExecStmt struct {
// InfoSchema stores a reference to the schema information.
InfoSchema infoschema.InfoSchema
// Plan stores a reference to the final physical plan.
Plan plannercore.Plan
Plan base.Plan
// Text represents the origin query text.
Text string

Expand Down Expand Up @@ -393,7 +394,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
}

// IsFastPlan exports for testing.
func IsFastPlan(p plannercore.Plan) bool {
func IsFastPlan(p base.Plan) bool {
if proj, ok := p.(*plannercore.PhysicalProjection); ok {
p = proj.Children()[0]
}
Expand Down Expand Up @@ -802,7 +803,7 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e exec.Executor, isPessimi
return false, nil, nil
}

func isNoResultPlan(p plannercore.Plan) bool {
func isNoResultPlan(p base.Plan) bool {
if p.Schema().Len() == 0 {
return true
}
Expand Down Expand Up @@ -1711,7 +1712,7 @@ func collectWarningsForSlowLog(stmtCtx *stmtctx.StatementContext) []variable.JSO
}

// GetResultRowsCount gets the count of the statement result rows.
func GetResultRowsCount(stmtCtx *stmtctx.StatementContext, p plannercore.Plan) int64 {
func GetResultRowsCount(stmtCtx *stmtctx.StatementContext, p base.Plan) int64 {
runtimeStatsColl := stmtCtx.RuntimeStatsColl
if runtimeStatsColl == nil {
return 0
Expand All @@ -1735,7 +1736,7 @@ func getFlatPlan(stmtCtx *stmtctx.StatementContext) *plannercore.FlatPhysicalPla
f := flat.(*plannercore.FlatPhysicalPlan)
return f
}
p := pp.(plannercore.Plan)
p := pp.(base.Plan)
flat := plannercore.FlattenPhysicalPlan(p, false)
if flat != nil {
stmtCtx.SetFlatPlan(flat)
Expand Down
19 changes: 10 additions & 9 deletions pkg/executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
Expand All @@ -51,7 +52,7 @@ import (

var (
_ exec.Executor = &testutil.MockDataSource{}
_ core.PhysicalPlan = &testutil.MockDataPhysicalPlan{}
_ base.PhysicalPlan = &testutil.MockDataPhysicalPlan{}
wideString = strings.Repeat("x", 5*1024)
)

Expand Down Expand Up @@ -80,7 +81,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec exec.Executor, schem
sg.SetSchema(schema)
sg.Init(ctx.GetPlanCtx(), nil, 0)

var tail core.PhysicalPlan = sg
var tail base.PhysicalPlan = sg
// if data source is not sorted, we have to attach sort, to make the input of stream-agg sorted
if !dataSourceSorted {
byItems := make([]*util.ByItems, 0, len(sg.GroupByItems))
Expand All @@ -96,7 +97,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec exec.Executor, schem
}

var (
plan core.PhysicalPlan
plan base.PhysicalPlan
splitter core.PartitionSplitterType = core.PartitionHashSplitterType
)
if concurrency > 1 {
Expand All @@ -105,8 +106,8 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec exec.Executor, schem
}
plan = core.PhysicalShuffle{
Concurrency: concurrency,
Tails: []core.PhysicalPlan{tail},
DataSources: []core.PhysicalPlan{src},
Tails: []base.PhysicalPlan{tail},
DataSources: []base.PhysicalPlan{src},
SplitterType: splitter,
ByItemArrays: [][]expression.Expression{sg.GroupByItems},
}.Init(ctx.GetPlanCtx(), nil, 0)
Expand Down Expand Up @@ -315,7 +316,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
win.SetSchema(winSchema)
win.Init(ctx.GetPlanCtx(), nil, 0)

var tail core.PhysicalPlan = win
var tail base.PhysicalPlan = win
if !dataSourceSorted {
byItems := make([]*util.ByItems, 0, len(partitionBy))
for _, col := range partitionBy {
Expand All @@ -329,7 +330,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
win.SetChildren(src)
}

var plan core.PhysicalPlan
var plan base.PhysicalPlan
if concurrency > 1 {
byItems := make([]expression.Expression, 0, len(win.PartitionBy))
for _, item := range win.PartitionBy {
Expand All @@ -338,8 +339,8 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f

plan = core.PhysicalShuffle{
Concurrency: concurrency,
Tails: []core.PhysicalPlan{tail},
DataSources: []core.PhysicalPlan{src},
Tails: []base.PhysicalPlan{tail},
DataSources: []base.PhysicalPlan{src},
SplitterType: core.PartitionHashSplitterType,
ByItemArrays: [][]expression.Expression{byItems},
}.Init(ctx.GetPlanCtx(), nil, 0)
Expand Down
27 changes: 14 additions & 13 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
Expand Down Expand Up @@ -150,11 +151,11 @@ func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSch
}

// Build builds an executor tree according to `p`.
func (b *MockExecutorBuilder) Build(p plannercore.Plan) exec.Executor {
func (b *MockExecutorBuilder) Build(p base.Plan) exec.Executor {
return b.build(p)
}

func (b *executorBuilder) build(p plannercore.Plan) exec.Executor {
func (b *executorBuilder) build(p base.Plan) exec.Executor {
switch v := p.(type) {
case nil:
return nil
Expand Down Expand Up @@ -2160,8 +2161,8 @@ func (b *executorBuilder) buildTopN(v *plannercore.PhysicalTopN) exec.Executor {

func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor {
var (
innerPlan plannercore.PhysicalPlan
outerPlan plannercore.PhysicalPlan
innerPlan base.PhysicalPlan
outerPlan base.PhysicalPlan
)
if v.InnerChildIdx == 0 {
innerPlan = v.Children()[0]
Expand Down Expand Up @@ -2865,7 +2866,7 @@ func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expr
return
}

func (*executorBuilder) corColInDistPlan(plans []plannercore.PhysicalPlan) bool {
func (*executorBuilder) corColInDistPlan(plans []base.PhysicalPlan) bool {
for _, p := range plans {
switch x := p.(type) {
case *plannercore.PhysicalSelection:
Expand All @@ -2892,7 +2893,7 @@ func (*executorBuilder) corColInDistPlan(plans []plannercore.PhysicalPlan) bool
}

// corColInAccess checks whether there's correlated column in access conditions.
func (*executorBuilder) corColInAccess(p plannercore.PhysicalPlan) bool {
func (*executorBuilder) corColInAccess(p base.PhysicalPlan) bool {
var access []expression.Expression
switch x := p.(type) {
case *plannercore.PhysicalTableScan:
Expand All @@ -2908,7 +2909,7 @@ func (*executorBuilder) corColInAccess(p plannercore.PhysicalPlan) bool {
return false
}

func (b *executorBuilder) newDataReaderBuilder(p plannercore.PhysicalPlan) (*dataReaderBuilder, error) {
func (b *executorBuilder) newDataReaderBuilder(p base.PhysicalPlan) (*dataReaderBuilder, error) {
ts, err := b.getSnapshotTS()
if err != nil {
return nil, err
Expand Down Expand Up @@ -3183,7 +3184,7 @@ func (b *executorBuilder) buildIndexNestedLoopHashJoin(v *plannercore.PhysicalIn
func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableReader) (*TableReaderExecutor, error) {
tablePlans := v.TablePlans
if v.StoreType == kv.TiFlash {
tablePlans = []plannercore.PhysicalPlan{v.GetTablePlan()}
tablePlans = []base.PhysicalPlan{v.GetTablePlan()}
}
dagReq, err := builder.ConstructDAGReq(b.ctx, tablePlans, v.StoreType)
if err != nil {
Expand Down Expand Up @@ -3644,7 +3645,7 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) e
return ret
}

func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, val table.Table, err error) {
func buildTableReq(b *executorBuilder, schemaLen int, plans []base.PhysicalPlan) (dagReq *tipb.DAGRequest, val table.Table, err error) {
tableReq, err := builder.ConstructDAGReq(b.ctx, plans, kv.TiKV)
if err != nil {
return nil, nil, err
Expand All @@ -3665,7 +3666,7 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic
// buildIndexReq is designed to create a DAG for index request.
// If len(ByItems) != 0 means index request should return related columns
// to sort result rows in TiDB side for parition tables.
func buildIndexReq(ctx sessionctx.Context, columns []*model.IndexColumn, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, err error) {
func buildIndexReq(ctx sessionctx.Context, columns []*model.IndexColumn, handleLen int, plans []base.PhysicalPlan) (dagReq *tipb.DAGRequest, err error) {
indexReq, err := builder.ConstructDAGReq(ctx, plans, kv.TiKV)
if err != nil {
return nil, err
Expand Down Expand Up @@ -4001,7 +4002,7 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg
// 1. dataReaderBuilder calculate data range from argument, rather than plan.
// 2. the result executor is already opened.
type dataReaderBuilder struct {
plan plannercore.Plan
plan base.Plan
*executorBuilder

selectResultHook // for testing
Expand All @@ -4013,7 +4014,7 @@ type dataReaderBuilder struct {
}

type mockPhysicalIndexReader struct {
plannercore.PhysicalPlan
base.PhysicalPlan

e exec.Executor
}
Expand All @@ -4028,7 +4029,7 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context,
return builder.buildExecutorForIndexJoinInternal(ctx, builder.plan, lookUpContents, indexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal)
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.Context, plan plannercore.Plan, lookUpContents []*indexJoinLookUpContent,
func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.Context, plan base.Plan, lookUpContents []*indexJoinLookUpContent,
indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (exec.Executor, error) {
switch v := plan.(type) {
case *plannercore.PhysicalTableReader:
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
Expand Down Expand Up @@ -158,9 +159,9 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
// If the estimated output row count of any operator in the physical plan tree
// is greater than the specific threshold, we'll set it to lowPriority when
// sending it to the coprocessor.
func needLowerPriority(p plannercore.Plan) bool {
func needLowerPriority(p base.Plan) bool {
switch x := p.(type) {
case plannercore.PhysicalPlan:
case base.PhysicalPlan:
return isPhysicalPlanNeedLowerPriority(x)
case *plannercore.Execute:
return needLowerPriority(x.Plan)
Expand All @@ -180,7 +181,7 @@ func needLowerPriority(p plannercore.Plan) bool {
return false
}

func isPhysicalPlanNeedLowerPriority(p plannercore.PhysicalPlan) bool {
func isPhysicalPlanNeedLowerPriority(p base.PhysicalPlan) bool {
expensiveThreshold := int64(config.GetGlobalConfig().Log.ExpensiveThreshold)
if int64(p.StatsCount()) > expensiveThreshold {
return true
Expand Down
9 changes: 5 additions & 4 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/table"
Expand Down Expand Up @@ -204,7 +205,7 @@ type IndexReaderExecutor struct {
corColInAccess bool
idxCols []*expression.Column
colLens []int
plans []plannercore.PhysicalPlan
plans []base.PhysicalPlan

memTracker *memory.Tracker

Expand Down Expand Up @@ -470,8 +471,8 @@ type IndexLookUpExecutor struct {
corColInIdxSide bool
corColInTblSide bool
corColInAccess bool
idxPlans []plannercore.PhysicalPlan
tblPlans []plannercore.PhysicalPlan
idxPlans []base.PhysicalPlan
tblPlans []base.PhysicalPlan
idxCols []*expression.Column
colLens []int
// PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader.
Expand Down Expand Up @@ -1594,7 +1595,7 @@ func GetLackHandles(expectedHandles []kv.Handle, obtainedHandlesMap *kv.HandleMa
return diffHandles
}

func getPhysicalPlanIDs(plans []plannercore.PhysicalPlan) []int {
func getPhysicalPlanIDs(plans []base.PhysicalPlan) []int {
planIDs := make([]int, 0, len(plans))
for _, p := range plans {
planIDs = append(planIDs, p.ID())
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/terror"
planctx "github.com/pingcap/tidb/pkg/planner/context"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/util/fixcontrol"
"github.com/pingcap/tidb/pkg/privilege"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
Expand Down Expand Up @@ -1452,7 +1453,7 @@ func init() {
// While doing optimization in the plan package, we need to execute uncorrelated subquery,
// but the plan package cannot import the executor package because of the dependency cycle.
// So we assign a function implemented in the executor package to the plan package to avoid the dependency cycle.
plannercore.EvalSubqueryFirstRow = func(ctx context.Context, p plannercore.PhysicalPlan, is infoschema.InfoSchema, pctx planctx.PlanContext) ([]types.Datum, error) {
plannercore.EvalSubqueryFirstRow = func(ctx context.Context, p base.PhysicalPlan, is infoschema.InfoSchema, pctx planctx.PlanContext) ([]types.Datum, error) {
defer func(begin time.Time) {
s := pctx.GetSessionVars()
s.StmtCtx.SetSkipPlanCache(errors.NewNoStackError("query has uncorrelated sub-queries is un-cacheable"))
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/planner"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/table"
Expand Down Expand Up @@ -719,7 +720,7 @@ func (fkc *FKCascadeExec) buildExecutor(ctx context.Context) (exec.Executor, err
// this is to avoid performance issue, see: https://github.com/pingcap/tidb/issues/38631
var maxHandleFKValueInOneCascade = 1024

func (fkc *FKCascadeExec) buildFKCascadePlan(ctx context.Context) (plannercore.Plan, error) {
func (fkc *FKCascadeExec) buildFKCascadePlan(ctx context.Context) (base.Plan, error) {
if len(fkc.fkValues) == 0 && len(fkc.fkUpdatedValuesMap) == 0 {
return nil, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ go_test(
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/planner/core",
"//pkg/planner/core/base",
"//pkg/planner/util",
"//pkg/session",
"//pkg/sessionctx/variable",
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/importer/importer_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -261,7 +262,7 @@ func getTableImporter(ctx context.Context, t *testing.T, store kv.Storage, table
require.True(t, ok)
table, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(tableName))
require.NoError(t, err)
var selectPlan plannercore.PhysicalPlan
var selectPlan base.PhysicalPlan
if path == "" {
selectPlan = &plannercore.PhysicalSelection{}
}
Expand Down

0 comments on commit 0a2bb55

Please sign in to comment.