Skip to content

Commit

Permalink
Require limit parameter in delete command (#1066)
Browse files Browse the repository at this point in the history
Closes #982.
  • Loading branch information
noisersup committed Sep 1, 2022
1 parent 2b61063 commit 84c5a89
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 61 deletions.
66 changes: 66 additions & 0 deletions integration/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"

"github.com/FerretDB/FerretDB/integration/setup"
"github.com/FerretDB/FerretDB/integration/shareddata"
"github.com/FerretDB/FerretDB/internal/handlers/common"
)

// TestDeleteSimple checks simple cases of doc deletion.
Expand Down Expand Up @@ -55,3 +57,67 @@ func TestDeleteSimple(t *testing.T) {
})
}
}

func TestDeleteLimitErrors(t *testing.T) {
t.Parallel()

for name, tc := range map[string]struct {
deletes bson.A
expectedErr *mongo.CommandError
skip string
}{
"NotSet": {
deletes: bson.A{bson.D{{"q", bson.D{{"v", "foo"}}}}},
expectedErr: &mongo.CommandError{
Code: int32(common.ErrMissingField),
Name: common.ErrMissingField.String(),
Message: "BSON field 'delete.deletes.limit' is missing but a required field",
},
},
"ValidFloat": {
deletes: bson.A{bson.D{{"q", bson.D{{"v", "foo"}}}, {"limit", 1.00}}},
},
"ValidString": {
deletes: bson.A{bson.D{{"q", bson.D{{"v", "foo"}}}, {"limit", "1"}}},
skip: "https://github.com/FerretDB/FerretDB/issues/1089",
},
"InvalidFloat": {
deletes: bson.A{bson.D{{"q", bson.D{{"v", "foo"}}}, {"limit", 42.13}}},
expectedErr: &mongo.CommandError{
Code: int32(common.ErrFailedToParse),
Name: common.ErrFailedToParse.String(),
Message: "The limit field in delete objects must be 0 or 1. Got 42.13",
},
},
"InvalidInt": {
deletes: bson.A{bson.D{{"q", bson.D{{"v", "foo"}}}, {"limit", 100}}},
expectedErr: &mongo.CommandError{
Code: int32(common.ErrFailedToParse),
Name: common.ErrFailedToParse.String(),
Message: "The limit field in delete objects must be 0 or 1. Got 100",
},
},
} {
name, tc := name, tc
t.Run(name, func(t *testing.T) {
t.Parallel()

if tc.skip != "" {
t.Skip(tc.skip)
}

ctx, collection := setup.Setup(t)

res := collection.Database().RunCommand(ctx, bson.D{
{"delete", collection.Name()},
{"deletes", tc.deletes},
})

if tc.expectedErr != nil {
AssertEqualError(t, *tc.expectedErr, res.Err())
return
}
assert.Equal(t, nil, res.Err())
})
}
}
3 changes: 3 additions & 0 deletions internal/handlers/common/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ const (
// while projection document already marked as inclusion.
ErrProjectionExIn = ErrorCode(31254) // Location31254

// ErrMissingField indicates that the required field in document is missing.
ErrMissingField = ErrorCode(40414) // Location40414

// ErrFreeMonitoringDisabled indicates that free monitoring is disabled
// by command-line or config file.
ErrFreeMonitoringDisabled = ErrorCode(50840) // Location50840
Expand Down
12 changes: 7 additions & 5 deletions internal/handlers/common/errorcode_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/handlers/common/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ var (
// such as used in the limit, $size, etc.
func GetWholeNumberParam(value any) (int64, error) {
switch value := value.(type) {
// TODO: add string support https://github.com/FerretDB/FerretDB/issues/1089
case float64:
// TODO check float negative zero (math.Copysign(0, -1))
if value != math.Trunc(value) || math.IsNaN(value) || math.IsInf(value, 0) {
Expand Down
51 changes: 36 additions & 15 deletions internal/handlers/pg/msg_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,21 @@ func (h *Handler) MsgDelete(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return err
}

var limit int64 // TODO https://github.com/FerretDB/FerretDB/issues/982
if l, _ := d.Get("limit"); l != nil {
if limit, err = common.GetWholeNumberParam(l); err != nil {
return err
}
var limit int64

l, err := d.Get("limit")
if err != nil {
return common.NewErrorMsg(
common.ErrMissingField,
"BSON field 'delete.deletes.limit' is missing but a required field",
)
}

if limit, err = common.GetWholeNumberParam(l); err != nil || limit < 0 || limit > 1 {
return common.NewErrorMsg(
common.ErrFailedToParse,
fmt.Sprintf("The limit field in delete objects must be 0 or 1. Got %v", l),
)
}

var sp pgdb.SQLParam
Expand Down Expand Up @@ -165,17 +175,28 @@ func (h *Handler) MsgDelete(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
for i := 0; i < deletes.Len(); i++ {
err := processQuery(i)
if err != nil {
delErrors.Append(err, int32(i))

// Delete statements in the `deletes` field are not transactional.
// It means that we run each delete statement separately.
// If `ordered` is set as `true`, we don't execute the remaining statements
// after the first failure.
// If `ordered` is set as `false`, we execute all the statements and return
// the list of errors corresponding to the failed statements.
if ordered {
break
switch err.(type) {
// command errors should be return immediately
case *common.CommandError:
return nil, err

// write errors and others require to be handled in array
default:
delErrors.Append(err, int32(i))

// Delete statements in the `deletes` field are not transactional.
// It means that we run each delete statement separately.
// If `ordered` is set as `true`, we don't execute the remaining statements
// after the first failure.
// If `ordered` is set as `false`, we execute all the statements and return
// the list of errors corresponding to the failed statements.
if !ordered {
continue
}
}

// send response if ordered is true
break
}
}

Expand Down
114 changes: 73 additions & 41 deletions internal/handlers/tigris/msg_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,21 @@ func (h *Handler) MsgDelete(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return err
}

// TODO https://github.com/FerretDB/FerretDB/issues/982
var limit int64
if l, _ := d.Get("limit"); l != nil {
if limit, err = common.GetWholeNumberParam(l); err != nil {
return err
}

l, err := d.Get("limit")
if err != nil {
return common.NewErrorMsg(
common.ErrMissingField,
"BSON field 'delete.deletes.limit' is missing but a required field",
)
}

if limit, err = common.GetWholeNumberParam(l); err != nil || limit < 0 || limit > 1 {
return common.NewErrorMsg(
common.ErrFailedToParse,
fmt.Sprintf("The limit field in delete objects must be 0 or 1. Got %v", l),
)
}

var fp tigrisdb.FetchParam
Expand Down Expand Up @@ -105,59 +114,72 @@ func (h *Handler) MsgDelete(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
}

resDocs := make([]*types.Document, 0, 16)
// iterate through every row and delete matching ones
for _, doc := range fetchedDocs {
// fetch current items from collection
matches, err := common.FilterDocument(doc, filter)
if err != nil {

return respondWithStack(func() error {
// iterate through every row and delete matching ones
for _, doc := range fetchedDocs {
// fetch current items from collection
matches, err := common.FilterDocument(doc, filter)
if err != nil {
return err
}

if !matches {
continue
}

resDocs = append(resDocs, doc)
}

if resDocs, err = common.LimitDocuments(resDocs, limit); err != nil {
return err
}

if !matches {
continue
// if no field is matched in a row, go to the next one
if len(resDocs) == 0 {
return nil
}

resDocs = append(resDocs, doc)
}
res, err := h.delete(ctx, fp, resDocs)
if err != nil {
return err
}

if resDocs, err = common.LimitDocuments(resDocs, limit); err != nil {
return err
}
deleted += int32(res)

// if no field is matched in a row, go to the next one
if len(resDocs) == 0 {
return nil
}

res, err := h.delete(ctx, fp, resDocs)
if err != nil {
return err
}

deleted += int32(res)

return nil
})
}

delErrors := new(common.WriteErrors)

var reply wire.OpMsg

// process every delete filter
for i := 0; i < deletes.Len(); i++ {
err = processQuery(i)
err := processQuery(i)
if err != nil {
delErrors.Append(err, int32(i))

// Delete statements in the `deletes` field are not transactional.
// It means that we run each delete statement separately.
// If `ordered` is set as `true`, we don't execute the remaining statements
// after the first failure.
// If `ordered` is set as `false`, we execute all the statements and return
// the list of errors corresponding to the failed statements.
if ordered {
break
switch err.(type) {
// command errors should be return immediately
case *common.CommandError:
return nil, err

// write errors and others require to be handled in array
default:
delErrors.Append(err, int32(i))

// Delete statements in the `deletes` field are not transactional.
// It means that we run each delete statement separately.
// If `ordered` is set as `true`, we don't execute the remaining statements
// after the first failure.
// If `ordered` is set as `false`, we execute all the statements and return
// the list of errors corresponding to the failed statements.
if !ordered {
continue
}
}

// send response if ordered is true
break
}
}

Expand Down Expand Up @@ -211,3 +233,13 @@ func (h *Handler) delete(ctx context.Context, fp tigrisdb.FetchParam, docs []*ty

return len(ids), nil
}

// respondWithStack calls the fun. If fun returns
// not-nil error then it is wrapped with lazyerrors.Error.
func respondWithStack(fun func() error) error {
if err := fun(); err != nil {
return lazyerrors.Error(err)
}

return nil
}

0 comments on commit 84c5a89

Please sign in to comment.