Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pushdown simplest sorting for find command #2506

Merged
merged 16 commits into from
Apr 28, 2023
2 changes: 2 additions & 0 deletions cmd/ferretdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ var cli struct {
Test struct {
RecordsDir string `default:"" help:"Experimental: directory for record files."`
DisableFilterPushdown bool `default:"false" help:"Experimental: disable filter pushdown."`
EnableSortingPushdown bool `default:"false" help:"Experimental: enable sorting pushdown."`
EnableCursors bool `default:"false" help:"Experimental: enable cursors."`

//nolint:lll // for readability
Expand Down Expand Up @@ -339,6 +340,7 @@ func run() {

TestOpts: registry.TestOpts{
DisableFilterPushdown: cli.Test.DisableFilterPushdown,
EnableSortPushdown: cli.Test.EnableSortingPushdown,
EnableCursors: cli.Test.EnableCursors,
},
})
Expand Down
6 changes: 3 additions & 3 deletions internal/handlers/common/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func SortDocuments(docs []*types.Document, sortDoc *types.Document) error {
for i, sortKey := range sortDoc.Keys() {
sortField := must.NotFail(sortDoc.Get(sortKey))

sortType, err := getSortType(sortKey, sortField)
sortType, err := GetSortType(sortKey, sortField)
if err != nil {
return err
}
Expand Down Expand Up @@ -125,8 +125,8 @@ func (ds *docsSorter) Less(i, j int) bool {
return ds.sorts[k](p, q)
}

// getSortType determines SortType from input sort value.
func getSortType(key string, value any) (types.SortType, error) {
// GetSortType determines SortType from input sort value.
func GetSortType(key string, value any) (types.SortType, error) {
sortValue, err := GetWholeNumberParam(value)
if err != nil {
switch {
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func processStagesDocuments(ctx context.Context, p *stagesDocumentsParams) ([]*t

if err := p.dbPool.InTransaction(ctx, func(tx pgx.Tx) error {
var err error
iter, err := pgdb.QueryDocuments(ctx, tx, p.qp)
iter, _, err := pgdb.QueryDocuments(ctx, tx, p.qp)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func execDelete(ctx context.Context, dp *execDeleteParams) (int32, error) {
}

err := dp.dbPool.InTransaction(ctx, func(tx pgx.Tx) error {
iter, err := pgdb.QueryDocuments(ctx, tx, dp.qp)
iter, _, err := pgdb.QueryDocuments(ctx, tx, dp.qp)
if err != nil {
return err
}
Expand Down
20 changes: 14 additions & 6 deletions internal/handlers/pg/msg_explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
return nil, lazyerrors.Error(err)
}

qp.Sort, err = common.GetOptionalParam[*types.Document](explain, "sort", nil)
if err != nil {
return nil, lazyerrors.Error(err)
}

Check warning on line 79 in internal/handlers/pg/msg_explain.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_explain.go#L78-L79

Added lines #L78 - L79 were not covered by tests

if command.Command() == "aggregate" {
var pipeline *types.Array
pipeline, err = common.GetRequiredParam[*types.Array](explain, "pipeline")
Expand Down Expand Up @@ -103,20 +108,22 @@
qp.Filter = nil
}

if !h.EnableSortPushdown {
qp.Sort = nil
}

var queryPlanner *types.Document
var results pgdb.QueryResults

err = dbPool.InTransaction(ctx, func(tx pgx.Tx) error {
var err error
queryPlanner, err = pgdb.Explain(ctx, tx, &qp)
queryPlanner, results, err = pgdb.Explain(ctx, tx, &qp)
return err
})
if err != nil {
return nil, err
}

// if the plan returned filter info or index info, it means that pushdown had been done
pushdown := queryPlanner.HasByPath(types.NewStaticPath("Plan", "Filter")) ||
queryPlanner.HasByPath(types.NewStaticPath("Plan", "Index Cond"))

hostname, err := os.Hostname()
if err != nil {
return nil, lazyerrors.Error(err)
Expand All @@ -138,7 +145,8 @@
"queryPlanner", queryPlanner,
"explainVersion", "1",
"command", cmd,
"pushdown", pushdown,
"pushdown", results.FilterPushdown,
"sortingPushdown", results.SortPushdown,
"serverInfo", serverInfo,
"ok", float64(1),
))},
Expand Down
33 changes: 20 additions & 13 deletions internal/handlers/pg/msg_find.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,20 @@
qp.Filter = params.Filter
}

if h.EnableSortPushdown && params.Projection == nil {
qp.Sort = params.Sort
}

Check warning on line 79 in internal/handlers/pg/msg_find.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_find.go#L78-L79

Added lines #L78 - L79 were not covered by tests

var resDocs []*types.Document
err = dbPool.InTransaction(ctx, func(tx pgx.Tx) error {
if params.BatchSize == 0 {
return nil
}

var iter types.DocumentsIterator
var queryRes pgdb.QueryResults

iter, err = pgdb.QueryDocuments(ctx, tx, qp)
iter, queryRes, err = pgdb.QueryDocuments(ctx, tx, qp)
if err != nil {
return lazyerrors.Error(err)
}
Expand All @@ -92,18 +97,20 @@

iter = common.FilterIterator(iter, closer, params.Filter)

iter, err = common.SortIterator(iter, closer, params.Sort)
if err != nil {
var pathErr *types.DocumentPathError
if errors.As(err, &pathErr) && pathErr.Code() == types.ErrDocumentPathEmptyKey {
return commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrPathContainsEmptyElement,
"Empty field names in path are not allowed",
document.Command(),
)
if !queryRes.SortPushdown {
iter, err = common.SortIterator(iter, closer, params.Sort)
if err != nil {
var pathErr *types.DocumentPathError
if errors.As(err, &pathErr) && pathErr.Code() == types.ErrDocumentPathEmptyKey {
return commonerrors.NewCommandErrorMsgWithArgument(
commonerrors.ErrPathContainsEmptyElement,
"Empty field names in path are not allowed",
document.Command(),
)
}

return lazyerrors.Error(err)
}

return lazyerrors.Error(err)
}

iter = common.SkipIterator(iter, closer, params.Skip)
Expand Down Expand Up @@ -181,7 +188,7 @@
fp.qp.Filter = nil
}

iter, err := pgdb.QueryDocuments(ctx, fp.tx, fp.qp)
iter, _, err := pgdb.QueryDocuments(ctx, fp.tx, fp.qp)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions internal/handlers/pg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type NewOpts struct {

// test options
DisableFilterPushdown bool
EnableSortPushdown bool
EnableCursors bool
}

Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/pgdb/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func Collections(ctx context.Context, tx pgx.Tx, db string) ([]string, error) {
return []string{}, nil
}

iter, err := buildIterator(ctx, tx, &iteratorParams{
iter, _, err := buildIterator(ctx, tx, &iteratorParams{
schema: db,
table: dbMetadataTableName,
})
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/pgdb/database_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (ms *metadataStorage) get(ctx context.Context, forUpdate bool) (*metadata,
forUpdate: forUpdate,
}

iter, err := buildIterator(ctx, ms.tx, iterParams)
iter, _, err := buildIterator(ctx, ms.tx, iterParams)
if err != nil {
return nil, lazyerrors.Error(err)
}
Expand Down