Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
rumyantseva committed Nov 30, 2022
2 parents 03c2304 + 0a63e58 commit 2b92bd6
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 384 deletions.
42 changes: 2 additions & 40 deletions internal/handlers/pg/msg_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/pg/pgdb"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
Expand Down Expand Up @@ -87,45 +86,8 @@ func (h *Handler) MsgCount(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, e

resDocs := make([]*types.Document, 0, 16)
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
var it iterator.Interface[uint32, *types.Document]
it, err = h.PgPool.GetDocuments(ctx, tx, &sp)
if err != nil {
return err
}

defer it.Close()

for {
var doc *types.Document
_, doc, err = it.Next()

// if the context is canceled, we don't need to continue processing documents
if ctx.Err() != nil {
return ctx.Err()
}

switch {
case err == nil:
// do nothing
case errors.Is(err, iterator.ErrIteratorDone):
// no more documents
return nil
default:
return err
}

var matches bool
matches, err = common.FilterDocument(doc, filter)
if err != nil {
return err
}

if !matches {
continue
}

resDocs = append(resDocs, doc)
}
resDocs, err = h.fetchAndFilterDocs(ctx, tx, &sp)
return err
})

if err != nil {
Expand Down
80 changes: 49 additions & 31 deletions internal/handlers/pg/msg_find.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package pg

import (
"context"
"errors"
"time"

"github.com/jackc/pgx/v4"

"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/pg/pgdb"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
Expand Down Expand Up @@ -63,37 +65,8 @@ func (h *Handler) MsgFind(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, er

resDocs := make([]*types.Document, 0, 16)
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
fetchedChan, err := h.PgPool.QueryDocuments(ctx, tx, &sp)
if err != nil {
return err
}
defer func() {
// Drain the channel to prevent leaking goroutines.
// TODO Offer a better design instead of channels: https://github.com/FerretDB/FerretDB/issues/898.
for range fetchedChan {
}
}()

for fetchedItem := range fetchedChan {
if fetchedItem.Err != nil {
return fetchedItem.Err
}

for _, doc := range fetchedItem.Docs {
matches, err := common.FilterDocument(doc, params.Filter)
if err != nil {
return err
}

if !matches {
continue
}

resDocs = append(resDocs, doc)
}
}

return nil
resDocs, err = h.fetchAndFilterDocs(ctx, tx, &sp)
return err
})

if err != nil {
Expand Down Expand Up @@ -136,3 +109,48 @@ func (h *Handler) MsgFind(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, er

return &reply, nil
}

// fetchAndFilterDocs fetches documents from the database and filters them using the provided sqlParam.Filter.
func (h *Handler) fetchAndFilterDocs(ctx context.Context, tx pgx.Tx, sqlParam *pgdb.SQLParam) ([]*types.Document, error) {
resDocs := make([]*types.Document, 0, 16)

var it iterator.Interface[uint32, *types.Document]

it, err := h.PgPool.GetDocuments(ctx, tx, sqlParam)
if err != nil {
return nil, err
}

defer it.Close()

for {
var doc *types.Document
_, doc, err = it.Next()

// if the context is canceled, we don't need to continue processing documents
if ctx.Err() != nil {
return nil, ctx.Err()
}

switch {
case err == nil:
// do nothing
case errors.Is(err, iterator.ErrIteratorDone):
// no more documents
return resDocs, nil
default:
return nil, err
}

matches, err := common.FilterDocument(doc, sqlParam.Filter)
if err != nil {
return nil, err
}

if !matches {
continue
}

resDocs = append(resDocs, doc)
}
}
35 changes: 3 additions & 32 deletions internal/handlers/pg/msg_findandmodify.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,46 +75,17 @@ func (h *Handler) MsgFindAndModify(ctx context.Context, msg *wire.OpMsg) (*wire.
// We might consider rewriting it later.
var reply wire.OpMsg
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
resDocs := make([]*types.Document, 0, 16)

fetchedChan, err := h.PgPool.QueryDocuments(ctx, tx, &sqlParam)
var resDocs []*types.Document
resDocs, err = h.fetchAndFilterDocs(ctx, tx, &sqlParam)
if err != nil {
return err
}
defer func() {
// Drain the channel to prevent leaking goroutines.
// TODO Offer a better design instead of channels: https://github.com/FerretDB/FerretDB/issues/898.
for range fetchedChan {
}
}()

var fetchedDocs []*types.Document
for fetchedItem := range fetchedChan {
if fetchedItem.Err != nil {
return fetchedItem.Err
}

fetchedDocs = append(fetchedDocs, fetchedItem.Docs...)
}

err = common.SortDocuments(fetchedDocs, params.Sort)
err = common.SortDocuments(resDocs, params.Sort)
if err != nil {
return err
}

for _, doc := range fetchedDocs {
matches, err := common.FilterDocument(doc, params.Query)
if err != nil {
return err
}

if !matches {
continue
}

resDocs = append(resDocs, doc)
}

// findAndModify always works with a single document
if resDocs, err = common.LimitDocuments(resDocs, 1); err != nil {
return err
Expand Down
28 changes: 1 addition & 27 deletions internal/handlers/pg/msg_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,36 +137,10 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,

sp.Filter = q

resDocs := make([]*types.Document, 0, 16)
fetchedChan, err := h.PgPool.QueryDocuments(ctx, tx, &sp)
resDocs, err := h.fetchAndFilterDocs(ctx, tx, &sp)
if err != nil {
return err
}
defer func() {
// Drain the channel to prevent leaking goroutines.
// TODO Offer a better design instead of channels: https://github.com/FerretDB/FerretDB/issues/898.
for range fetchedChan {
}
}()

for fetchedItem := range fetchedChan {
if fetchedItem.Err != nil {
return fetchedItem.Err
}

for _, doc := range fetchedItem.Docs {
matches, err := common.FilterDocument(doc, q)
if err != nil {
return err
}

if !matches {
continue
}

resDocs = append(resDocs, doc)
}
}

if len(resDocs) == 0 {
if !upsert {
Expand Down
111 changes: 0 additions & 111 deletions internal/handlers/pg/pgdb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,61 +54,6 @@ type SQLParam struct {
Filter *types.Document
}

// QueryDocuments returns a channel with buffer FetchedChannelBufSize
// to fetch list of documents for given FerretDB database and collection.
//
// If an error occurs before the fetching, the error is returned immediately.
// The returned channel is always non-nil.
//
// The channel is closed when all documents are sent; the caller should always drain the channel.
// If an error occurs during fetching, the last message before closing the channel contains an error.
// Context cancellation is not considered an error.
//
// If the collection doesn't exist, fetch returns a closed channel and no error.
//
// Deprecated: use GetDocuments, TODO remove in https://github.com/FerretDB/FerretDB/issues/898.
func (pgPool *Pool) QueryDocuments(ctx context.Context, tx pgx.Tx, sp *SQLParam) (<-chan FetchedDocs, error) {
fetchedChan := make(chan FetchedDocs, FetchedChannelBufSize)

q, args, err := buildQuery(ctx, tx, sp)
if err != nil {
close(fetchedChan)
if errors.Is(err, ErrTableNotExist) {
return fetchedChan, nil
}
return fetchedChan, lazyerrors.Error(err)
}

rows, err := tx.Query(ctx, q, args...)
if err != nil {
close(fetchedChan)
return fetchedChan, lazyerrors.Error(err)
}

go func() {
defer close(fetchedChan)
defer rows.Close()

err := iterateFetch(ctx, fetchedChan, rows)
switch {
case err == nil:
// nothing
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
pgPool.Config().ConnConfig.Logger.Log(
ctx, pgx.LogLevelWarn, "context canceled, stopping fetching",
map[string]any{"db": sp.DB, "collection": sp.Collection, "error": err},
)
default:
pgPool.Config().ConnConfig.Logger.Log(
ctx, pgx.LogLevelError, "got error, stopping fetching",
map[string]any{"db": sp.DB, "collection": sp.Collection, "error": err},
)
}
}()

return fetchedChan, nil
}

// GetDocuments returns an queryIterator to fetch documents for given SQLParams.
// If the collection doesn't exist, it returns an empty iterator and no error.
// If an error occurs, it returns nil and that error, possibly wrapped.
Expand Down Expand Up @@ -253,62 +198,6 @@ func prepareWhereClause(sqlFilters *types.Document) (string, []any) {
return query, args
}

// iterateFetch iterates over the rows returned by the query and sends FetchedDocs to fetched channel.
// It returns ctx.Err() if context cancellation was received.
func iterateFetch(ctx context.Context, fetched chan FetchedDocs, rows pgx.Rows) error {
for ctx.Err() == nil {
var allFetched bool
res := make([]*types.Document, 0, FetchedSliceCapacity)
for i := 0; i < FetchedSliceCapacity; i++ {
if !rows.Next() {
allFetched = true
break
}

var b []byte
if err := rows.Scan(&b); err != nil {
return writeFetched(ctx, fetched, FetchedDocs{Err: lazyerrors.Error(err)})
}

doc, err := pjson.Unmarshal(b)
if err != nil {
return writeFetched(ctx, fetched, FetchedDocs{Err: lazyerrors.Error(err)})
}

res = append(res, doc.(*types.Document))
}

if len(res) > 0 {
if err := writeFetched(ctx, fetched, FetchedDocs{Docs: res}); err != nil {
return err
}
}

if allFetched {
if err := rows.Err(); err != nil {
if ferr := writeFetched(ctx, fetched, FetchedDocs{Err: lazyerrors.Error(err)}); ferr != nil {
return ferr
}
}

return nil
}
}

return ctx.Err()
}

// writeFetched sends FetchedDocs to fetched channel or handles context cancellation.
// It returns ctx.Err() if context cancellation was received.
func writeFetched(ctx context.Context, fetched chan FetchedDocs, doc FetchedDocs) error {
select {
case <-ctx.Done():
return ctx.Err()
case fetched <- doc:
return nil
}
}

// convertJSON transforms decoded JSON map[string]any value into *types.Document.
func convertJSON(value any) any {
switch value := value.(type) {
Expand Down
Loading

0 comments on commit 2b92bd6

Please sign in to comment.