Skip to content

Commit

Permalink
checkout cpk serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
gouhongshen committed May 23, 2024
1 parent fe19c52 commit 22c5ec3
Showing 1 changed file with 80 additions and 25 deletions.
105 changes: 80 additions & 25 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package disttae
import (
"context"
"fmt"
util2 "github.com/matrixorigin/matrixone/pkg/util"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -715,7 +717,6 @@ func (tbl *txnTable) rangesOnePart(
outBlocks *objectio.BlockInfoSlice, // output marshaled block list after filtering
proc *process.Process, // process of this transaction
) (err error) {

uncommittedObjects := tbl.collectUnCommittedObjects()
dirtyBlks := tbl.collectDirtyBlocks(state, uncommittedObjects)

Expand All @@ -728,15 +729,6 @@ func (tbl *txnTable) rangesOnePart(
return nil
}

if done, err = tbl.tryFastRanges(
exprs, state, uncommittedObjects, dirtyBlks, outBlocks,
tbl.getTxn().engine.fs,
); err != nil {
return err
} else if done {
return nil
}

// for dynamic parameter, substitute param ref and const fold cast expression here to improve performance
newExprs, err := plan2.ConstandFoldList(exprs, tbl.proc.Load(), true)
if err == nil {
Expand Down Expand Up @@ -969,6 +961,72 @@ func (tbl *txnTable) collectDirtyBlocks(
return dirtyBlks
}

func visitCPK(expr *plan.Expr, visit []any, pks []string) {
switch exprImpl := expr.Expr.(type) {
case *plan.Expr_F:
switch exprImpl.F.Func.ObjName {
case "and":
visitCPK(exprImpl.F.Args[0], visit, pks)
visitCPK(exprImpl.F.Args[1], visit, pks)

case "=":
var ok bool
var colExpr *plan.Expr_Col

if colExpr, ok = exprImpl.F.Args[0].Expr.(*plan.Expr_Col); !ok {
if colExpr, ok = exprImpl.F.Args[1].Expr.(*plan.Expr_Col); !ok {
return
}
}

if pos := getPosInCompositPK(colExpr.Col.Name, pks); pos != -1 {
visit[pos] = struct{}{}
}
}
}
}

func checkCompositePKSerialization(tbl *txnTable, exprs []*plan.Expr) {
if tbl.tableDef.Pkey.CompPkeyCol == nil {
return
}
visit := make([]any, len(tbl.tableDef.Pkey.Names))

if len(exprs) == 1 {
visitCPK(exprs[0], visit, tbl.tableDef.Pkey.Names)
} else {
// each expr has a part of the cpk
for _, expr := range exprs {
visitCPK(expr, visit, tbl.tableDef.Pkey.Names)
}
}

// other case:
// nil,nil,nil,nil
// nil,b,c,d
// a,nil,c,d

// full matched or prefix matched
// a,b,c,d
// a,b,c,nil
// a,b,nil,nil
// a,nil,nil,nil
idx := slices.Index(visit, nil)
if idx == 0 {
return
}
if idx != -1 {
if idx = slices.Index(visit[idx+1:], any(struct{}{})); idx != -1 {
return
}
}

logutil.Errorf("found unserial composite primary key, tbl: %s, tbldef: %v, exprs: %s",
tbl.tableName, tbl.tableDef, plan2.FormatExprs(exprs))
util2.EnableCoreDump()
util2.CoreDump()
}

// tryFastFilterBlocks is going to replace the tryFastRanges completely soon, in progress now.
func (tbl *txnTable) tryFastFilterBlocks(
exprs []*plan.Expr,
Expand All @@ -977,21 +1035,18 @@ func (tbl *txnTable) tryFastFilterBlocks(
dirtyBlocks map[types.Blockid]struct{},
outBlocks *objectio.BlockInfoSlice,
fs fileservice.FileService) (done bool, err error) {
// TODO: refactor this code if composite key can be pushdown
if tbl.tableDef.Pkey.CompPkeyCol == nil {
return TryFastFilterBlocks(
tbl.db.op.SnapshotTS(),
tbl.tableDef,
exprs,
snapshot,
uncommittedObjects,
dirtyBlocks,
outBlocks,
fs,
tbl.proc.Load(),
)
}
return
checkCompositePKSerialization(tbl, exprs)
return TryFastFilterBlocks(
tbl.db.op.SnapshotTS(),
tbl.tableDef,
exprs,
snapshot,
uncommittedObjects,
dirtyBlocks,
outBlocks,
fs,
tbl.proc.Load(),
)
}

// tryFastRanges only handle equal expression filter on zonemap and bloomfilter in tp scenario;
Expand Down

0 comments on commit 22c5ec3

Please sign in to comment.