Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace document slices with iterators #2730

Merged
merged 19 commits into from Jun 2, 2023
Merged
17 changes: 14 additions & 3 deletions internal/handlers/common/distinct.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/FerretDB/FerretDB/internal/handlers/commonerrors"
"github.com/FerretDB/FerretDB/internal/handlers/commonparams"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
)

Expand Down Expand Up @@ -65,10 +66,20 @@ func GetDistinctParams(document *types.Document, l *zap.Logger) (*DistinctParams
//
// If the key is found in the document, and the value is an array, each element of the array is added to the result.
// Otherwise, the value itself is added to the result.
func FilterDistinctValues(docs []*types.Document, key string) (*types.Array, error) {
distinct := types.MakeArray(len(docs))
func FilterDistinctValues(iter types.DocumentsIterator, key string) (*types.Array, error) {
distinct := types.MakeArray(0)
defer iter.Close()

for {
_, doc, err := iter.Next()
if err != nil {
if err == iterator.ErrIteratorDone {
break
}

return nil, err
}

for _, doc := range docs {
// docsAtSuffix contains all documents exist at the suffix key.
docsAtSuffix := []*types.Document{doc}
suffix := key
Expand Down
35 changes: 24 additions & 11 deletions internal/handlers/pg/msg_count.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/pg/pgdb"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
Expand Down Expand Up @@ -50,26 +51,38 @@ func (h *Handler) MsgCount(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, e
Collection: params.Collection,
}

qp.Filter = params.Filter
if !h.DisableFilterPushdown {
qp.Filter = params.Filter
}

var resDocs []*types.Document
err = dbPool.InTransaction(ctx, func(tx pgx.Tx) error {
resDocs, err = fetchAndFilterDocs(ctx, &fetchParams{tx, &qp, h.DisableFilterPushdown})
return err
iter, _, err := pgdb.QueryDocuments(ctx, tx, &qp)
if err != nil {
return err
rumyantseva marked this conversation as resolved.
Show resolved Hide resolved
}

closer := iterator.NewMultiCloser(iter)
defer closer.Close()

iter = common.FilterIterator(iter, closer, qp.Filter)

iter = common.SkipIterator(iter, closer, params.Skip)

iter = common.LimitIterator(iter, closer, params.Limit)

resDocs, err = iterator.ConsumeValues(iterator.Interface[struct{}, *types.Document](iter))
if err != nil {
return lazyerrors.Error(err)
}

return nil
})

if err != nil {
return nil, err
}

if resDocs, err = common.SkipDocuments(resDocs, params.Skip); err != nil {
return nil, lazyerrors.Error(err)
}

if resDocs, err = common.LimitDocuments(resDocs, params.Limit); err != nil {
return nil, lazyerrors.Error(err)
}

var reply wire.OpMsg
must.NoError(reply.SetSections(wire.OpMsgSection{
Documents: []*types.Document{must.NotFail(types.NewDocument(
Expand Down
29 changes: 17 additions & 12 deletions internal/handlers/pg/msg_distinct.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/pg/pgdb"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
Expand All @@ -47,24 +48,28 @@ func (h *Handler) MsgDistinct(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg
qp := pgdb.QueryParams{
DB: dp.DB,
Collection: dp.Collection,
Filter: dp.Filter,
Comment: dp.Comment,
}

var resDocs []*types.Document
if !h.DisableFilterPushdown {
qp.Filter = dp.Filter
}

var distinct *types.Array
err = dbPool.InTransaction(ctx, func(tx pgx.Tx) error {
resDocs, err = fetchAndFilterDocs(ctx, &fetchParams{tx, &qp, h.DisableFilterPushdown})
return err
})
iter, _, err := pgdb.QueryDocuments(ctx, tx, &qp)
if err != nil {
return err
rumyantseva marked this conversation as resolved.
Show resolved Hide resolved
}

if err != nil {
return nil, err
}
closer := iterator.NewMultiCloser(iter)
defer closer.Close()

distinct, err := common.FilterDistinctValues(resDocs, dp.Key)
if err != nil {
return nil, err
}
iter = common.FilterIterator(iter, closer, qp.Filter)

distinct, err = common.FilterDistinctValues(iter, dp.Key)
return err
})

var reply wire.OpMsg
must.NoError(reply.SetSections(wire.OpMsgSection{
Expand Down
14 changes: 11 additions & 3 deletions internal/handlers/tigris/msg_distinct.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/tigris/tigrisdb"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
Expand Down Expand Up @@ -48,12 +49,19 @@ func (h *Handler) MsgDistinct(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg
Filter: dp.Filter,
}

resDocs, err := fetchAndFilterDocs(ctx, &fetchParams{dbPool, &qp, h.DisableFilterPushdown})
fp := &fetchParams{dbPool, &qp, h.DisableFilterPushdown}

iter, err := fp.dbPool.QueryDocuments(ctx, fp.qp)
if err != nil {
return nil, err
return nil, lazyerrors.Error(err)
}

distinct, err := common.FilterDistinctValues(resDocs, dp.Key)
closer := iterator.NewMultiCloser(iter)
defer closer.Close()

iter = common.FilterIterator(iter, closer, qp.Filter)

distinct, err := common.FilterDistinctValues(iter, dp.Key)
if err != nil {
return nil, err
}
Expand Down