Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace document slices with iterators #2730

Merged
merged 19 commits into from Jun 2, 2023
Merged
19 changes: 16 additions & 3 deletions internal/handlers/common/distinct.go
Expand Up @@ -15,6 +15,7 @@
package common

import (
"errors"
"fmt"
"strings"

Expand All @@ -23,6 +24,7 @@
"github.com/FerretDB/FerretDB/internal/handlers/commonerrors"
"github.com/FerretDB/FerretDB/internal/handlers/commonparams"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
)

Expand Down Expand Up @@ -84,10 +86,21 @@
//
// If the key is found in the document, and the value is an array, each element of the array is added to the result.
// Otherwise, the value itself is added to the result.
func FilterDistinctValues(docs []*types.Document, key string) (*types.Array, error) {
distinct := types.MakeArray(len(docs))
func FilterDistinctValues(iter types.DocumentsIterator, key string) (*types.Array, error) {
distinct := types.MakeArray(0)

defer iter.Close()

for {
_, doc, err := iter.Next()
if errors.Is(err, iterator.ErrIteratorDone) {
break

Check warning on line 97 in internal/handlers/common/distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/common/distinct.go#L89-L97

Added lines #L89 - L97 were not covered by tests
}

if err != nil {
return nil, lazyerrors.Error(err)
}

Check warning on line 102 in internal/handlers/common/distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/common/distinct.go#L100-L102

Added lines #L100 - L102 were not covered by tests

for _, doc := range docs {
// docsAtSuffix contains all documents exist at the suffix key.
docsAtSuffix := []*types.Document{doc}
suffix := key
Expand Down
37 changes: 26 additions & 11 deletions internal/handlers/pg/msg_count.go
Expand Up @@ -22,6 +22,7 @@
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/pg/pgdb"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
Expand Down Expand Up @@ -50,26 +51,40 @@
Collection: params.Collection,
}

qp.Filter = params.Filter
if !h.DisableFilterPushdown {
qp.Filter = params.Filter
}

Check warning on line 56 in internal/handlers/pg/msg_count.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_count.go#L54-L56

Added lines #L54 - L56 were not covered by tests

var resDocs []*types.Document
err = dbPool.InTransaction(ctx, func(tx pgx.Tx) error {
resDocs, err = fetchAndFilterDocs(ctx, &fetchParams{tx, &qp, h.DisableFilterPushdown})
return err
var iter types.DocumentsIterator

iter, _, err = pgdb.QueryDocuments(ctx, tx, &qp)
if err != nil {
return lazyerrors.Error(err)
}

Check warning on line 65 in internal/handlers/pg/msg_count.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_count.go#L60-L65

Added lines #L60 - L65 were not covered by tests

closer := iterator.NewMultiCloser(iter)
defer closer.Close()

iter = common.FilterIterator(iter, closer, qp.Filter)

iter = common.SkipIterator(iter, closer, params.Skip)

iter = common.LimitIterator(iter, closer, params.Limit)

resDocs, err = iterator.ConsumeValues(iterator.Interface[struct{}, *types.Document](iter))
if err != nil {
return lazyerrors.Error(err)
}

Check warning on line 79 in internal/handlers/pg/msg_count.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_count.go#L67-L79

Added lines #L67 - L79 were not covered by tests

return nil

Check warning on line 81 in internal/handlers/pg/msg_count.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_count.go#L81

Added line #L81 was not covered by tests
})

if err != nil {
return nil, err
}

if resDocs, err = common.SkipDocuments(resDocs, params.Skip); err != nil {
return nil, lazyerrors.Error(err)
}

if resDocs, err = common.LimitDocuments(resDocs, params.Limit); err != nil {
return nil, lazyerrors.Error(err)
}

var reply wire.OpMsg
must.NoError(reply.SetSections(wire.OpMsgSection{
Documents: []*types.Document{must.NotFail(types.NewDocument(
Expand Down
37 changes: 24 additions & 13 deletions internal/handlers/pg/msg_distinct.go
Expand Up @@ -22,6 +22,7 @@
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/pg/pgdb"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
Expand All @@ -47,21 +48,35 @@
qp := pgdb.QueryParams{
DB: dp.DB,
Collection: dp.Collection,
Filter: dp.Filter,
Comment: dp.Comment,
}

var resDocs []*types.Document
if !h.DisableFilterPushdown {
qp.Filter = dp.Filter
}

Check warning on line 56 in internal/handlers/pg/msg_distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_distinct.go#L54-L56

Added lines #L54 - L56 were not covered by tests

var distinct *types.Array

Check warning on line 58 in internal/handlers/pg/msg_distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_distinct.go#L58

Added line #L58 was not covered by tests
err = dbPool.InTransaction(ctx, func(tx pgx.Tx) error {
resDocs, err = fetchAndFilterDocs(ctx, &fetchParams{tx, &qp, h.DisableFilterPushdown})
return err
})
var iter types.DocumentsIterator

Check warning on line 60 in internal/handlers/pg/msg_distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_distinct.go#L60

Added line #L60 was not covered by tests

if err != nil {
return nil, err
}
iter, _, err = pgdb.QueryDocuments(ctx, tx, &qp)
if err != nil {
return lazyerrors.Error(err)
}

Check warning on line 65 in internal/handlers/pg/msg_distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_distinct.go#L62-L65

Added lines #L62 - L65 were not covered by tests

closer := iterator.NewMultiCloser(iter)
defer closer.Close()

iter = common.FilterIterator(iter, closer, qp.Filter)

distinct, err = common.FilterDistinctValues(iter, dp.Key)
if err != nil {
return lazyerrors.Error(err)
}

Check warning on line 75 in internal/handlers/pg/msg_distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_distinct.go#L67-L75

Added lines #L67 - L75 were not covered by tests

return nil

Check warning on line 77 in internal/handlers/pg/msg_distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/pg/msg_distinct.go#L77

Added line #L77 was not covered by tests
})

distinct, err := common.FilterDistinctValues(resDocs, dp.Key)
if err != nil {
return nil, err
}
Expand All @@ -74,9 +89,5 @@
))},
}))

if err != nil {
return nil, lazyerrors.Error(err)
}

return &reply, nil
}
19 changes: 15 additions & 4 deletions internal/handlers/tigris/msg_distinct.go
Expand Up @@ -20,6 +20,7 @@
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/tigris/tigrisdb"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
Expand All @@ -42,18 +43,28 @@
return nil, err
}

filter := dp.Filter

Check warning on line 47 in internal/handlers/tigris/msg_distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/tigris/msg_distinct.go#L46-L47

Added lines #L46 - L47 were not covered by tests
qp := tigrisdb.QueryParams{
DB: dp.DB,
Collection: dp.Collection,
Filter: dp.Filter,
}

resDocs, err := fetchAndFilterDocs(ctx, &fetchParams{dbPool, &qp, h.DisableFilterPushdown})
if !h.DisableFilterPushdown {
qp.Filter = filter
rumyantseva marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 55 in internal/handlers/tigris/msg_distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/tigris/msg_distinct.go#L53-L55

Added lines #L53 - L55 were not covered by tests

iter, err := dbPool.QueryDocuments(ctx, &qp)

Check warning on line 57 in internal/handlers/tigris/msg_distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/tigris/msg_distinct.go#L57

Added line #L57 was not covered by tests
if err != nil {
return nil, err
return nil, lazyerrors.Error(err)

Check warning on line 59 in internal/handlers/tigris/msg_distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/tigris/msg_distinct.go#L59

Added line #L59 was not covered by tests
}

distinct, err := common.FilterDistinctValues(resDocs, dp.Key)
closer := iterator.NewMultiCloser(iter)
defer closer.Close()

iter = common.FilterIterator(iter, closer, filter)

distinct, err := common.FilterDistinctValues(iter, dp.Key)

Check warning on line 67 in internal/handlers/tigris/msg_distinct.go

View check run for this annotation

Codecov / codecov/patch

internal/handlers/tigris/msg_distinct.go#L62-L67

Added lines #L62 - L67 were not covered by tests
if err != nil {
return nil, err
}
Expand Down