Skip to content

Commit

Permalink
planner: add more checks when pushing TopN down (pingcap#41370)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dousir9 authored and ghazalfamilyusa committed Feb 15, 2023
1 parent 2f9e7b2 commit 68a61de
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 14 deletions.
6 changes: 3 additions & 3 deletions planner/core/casetest/rule_derive_topn_from_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/parser/model"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/planner/core/internal"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/testdata"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -76,15 +76,15 @@ func TestPushDerivedTopnFlash(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, primary key(b,a))")
SetTiFlashReplica(t, dom, "test", "t")
internal.SetTiFlashReplica(t, dom, "test", "t")
tk.MustExec("set tidb_enforce_mpp=1")
tk.MustExec("set @@session.tidb_allow_mpp=ON;")
var input Input
var output []struct {
SQL string
Plan []string
}
suiteData := plannercore.GetDerivedTopNSuiteData()
suiteData := GetDerivedTopNSuiteData()
suiteData.LoadTestCases(t, &input, &output)
for i, sql := range input {
plan := tk.MustQuery("explain format = 'brief' " + sql)
Expand Down
79 changes: 79 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5130,3 +5130,82 @@ func TestIsIPv6ToTiFlash(t *testing.T) {
}
tk.MustQuery("explain select is_ipv6(v6) from t;").CheckAt([]int{0, 2, 4}, rows)
}

// https://github.com/pingcap/tidb/issues/41355
// The "virtual generated column" push down is not supported now.
// This test covers: TopN, Projection, Selection.
func TestVirtualExprPushDown(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t;")
tk.MustExec("CREATE TABLE t (c1 int DEFAULT 0, c2 int GENERATED ALWAYS AS (abs(c1)) VIRTUAL);")
tk.MustExec("insert into t(c1) values(1), (-1), (2), (-2), (99), (-99);")
tk.MustExec("set @@tidb_isolation_read_engines = 'tikv'")

// TopN to tikv.
rows := [][]interface{}{
{"TopN_7", "root", "test.t.c2, offset:0, count:2"},
{"└─TableReader_13", "root", "data:TableFullScan_12"},
{" └─TableFullScan_12", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows)

// Projection to tikv.
rows = [][]interface{}{
{"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"},
{"└─TableReader_5", "root", "data:TableFullScan_4"},
{" └─TableFullScan_4", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustExec("set session tidb_opt_projection_push_down='ON';")
tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows)
tk.MustExec("set session tidb_opt_projection_push_down='OFF';")

// Selection to tikv.
rows = [][]interface{}{
{"Selection_7", "root", "gt(test.t.c2, 1)"},
{"└─TableReader_6", "root", "data:TableFullScan_5"},
{" └─TableFullScan_5", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows)

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

// TopN to tiflash.
rows = [][]interface{}{
{"TopN_7", "root", "test.t.c2, offset:0, count:2"},
{"└─TableReader_15", "root", "data:TableFullScan_14"},
{" └─TableFullScan_14", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows)

// Projection to tiflash.
rows = [][]interface{}{
{"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"},
{"└─TableReader_6", "root", "data:TableFullScan_5"},
{" └─TableFullScan_5", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustExec("set session tidb_opt_projection_push_down='ON';")
tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows)
tk.MustExec("set session tidb_opt_projection_push_down='OFF';")

// Selection to tiflash.
rows = [][]interface{}{
{"Selection_8", "root", "gt(test.t.c2, 1)"},
{"└─TableReader_7", "root", "data:TableFullScan_6"},
{" └─TableFullScan_6", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows)
}
63 changes: 52 additions & 11 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,15 +895,6 @@ func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool {
return true
}

// canPushDown checks if this topN can be pushed down. If each of the expression can be converted to pb, it can be pushed.
func (p *PhysicalTopN) canPushDown(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

func (p *PhysicalSort) attach2Task(tasks ...task) task {
t := tasks[0].copy()
t = attachPlan2Task(p, t)
Expand Down Expand Up @@ -955,14 +946,64 @@ func (p *PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []*
return true
}

// canExpressionConvertedToPB checks whether each of the the expression in TopN can be converted to pb.
func (p *PhysicalTopN) canExpressionConvertedToPB(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

// containVirtualColumn checks whether TopN.ByItems contains virtual generated columns.
func (p *PhysicalTopN) containVirtualColumn(tCols []*expression.Column) bool {
for _, by := range p.ByItems {
cols := expression.ExtractColumns(by.Expr)
for _, col := range cols {
for _, tCol := range tCols {
// A column with ID > 0 indicates that the column can be resolved by data source.
if tCol.ID > 0 && tCol.ID == col.ID && tCol.VirtualExpr != nil {
return true
}
}
}
}
return false
}

// canPushDownToTiKV checks whether this topN can be pushed down to TiKV.
func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool {
if !p.canExpressionConvertedToPB(kv.TiKV) {
return false
}
if len(copTask.rootTaskConds) != 0 {
return false
}
if p.containVirtualColumn(copTask.plan().Schema().Columns) {
return false
}
return true
}

// canPushDownToTiFlash checks whether this topN can be pushed down to TiFlash.
func (p *PhysicalTopN) canPushDownToTiFlash(mppTask *mppTask) bool {
if !p.canExpressionConvertedToPB(kv.TiFlash) {
return false
}
if p.containVirtualColumn(mppTask.plan().Schema().Columns) {
return false
}
return true
}

func (p *PhysicalTopN) attach2Task(tasks ...task) task {
t := tasks[0].copy()
cols := make([]*expression.Column, 0, len(p.ByItems))
for _, item := range p.ByItems {
cols = append(cols, expression.ExtractColumns(item.Expr)...)
}
needPushDown := len(cols) > 0
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 {
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) {
newTask, changed := p.pushTopNDownToDynamicPartition(copTask)
if changed {
return newTask
Expand All @@ -978,7 +1019,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task {
pushedDownTopN = p.getPushedDownTopN(copTask.tablePlan)
copTask.tablePlan = pushedDownTopN
}
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDown(kv.TiFlash) {
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDownToTiFlash(mppTask) {
pushedDownTopN := p.getPushedDownTopN(mppTask.p)
mppTask.p = pushedDownTopN
}
Expand Down

0 comments on commit 68a61de

Please sign in to comment.