Skip to content

Commit

Permalink
Simplify/sync delete a bit (#1104)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlekSi committed Sep 1, 2022
1 parent edad1f8 commit cf4aeaa
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 119 deletions.
61 changes: 37 additions & 24 deletions integration/delete_compat_test.go
Expand Up @@ -43,30 +43,55 @@ func TestDeleteCompat(t *testing.T) {
filters: []bson.D{},
resultType: emptyResult,
},
"OrderedTrue": {

"One": {
filters: []bson.D{
{{"v", int32(42)}},
},
},
"All": {
filters: []bson.D{
{},
},
},

"Two": {
filters: []bson.D{
{{"v", int32(42)}},
{{"v", int32(0)}},
},
},
"TwoAll": {
filters: []bson.D{
{{"v", int32(42)}},
{},
},
},
"TwoAllOrdered": {
filters: []bson.D{
{{"v", "foo"}},
{{"v", bson.D{{"$all", 9}}}},
{{"v", float32(42.13)}},
{},
},
ordered: true,
},
"OrderedFalse": {

"OrderedError": {
filters: []bson.D{
{{"v", "foo"}},
{{"v", bson.D{{"$all", 9}}}},
{{"v", float32(42.13)}},
},
ordered: false,
ordered: true,
},
"OrderedTrueNoError": {
"UnorderedError": {
filters: []bson.D{
{{"v", "foo"}},
{{"v", bson.D{{"$all", 9}}}},
{{"v", float32(42.13)}},
},
ordered: true,
},
"OrderedTrueTwoErrors": {

"OrderedTwoErrors": {
filters: []bson.D{
{{"v", "foo"}},
{{"v", bson.D{{"$all", 9}}}},
Expand All @@ -75,16 +100,16 @@ func TestDeleteCompat(t *testing.T) {
},
ordered: true,
},
"OrderedFalseTwoErrors": {
"UnorderedTwoErrors": {
filters: []bson.D{
{{"v", "foo"}},
{{"v", bson.D{{"$all", 9}}}},
{{"v", float32(42.13)}},
{{"v", bson.D{{"$eq", 9}}}},
},
ordered: false,
},
"OrderedTrueAllErrors": {

"OrderedAllErrors": {
filters: []bson.D{
{{"v", bson.D{{"$all", 9}}}},
{{"v", bson.D{{"$eq", 9}}}},
Expand All @@ -93,26 +118,14 @@ func TestDeleteCompat(t *testing.T) {
ordered: true,
resultType: emptyResult,
},
"OrderedFalseAllErrors": {
"UnorderedAllErrors": {
filters: []bson.D{
{{"v", bson.D{{"$all", 9}}}},
{{"v", bson.D{{"$eq", 9}}}},
{{"v", bson.D{{"$all", 9}}}},
},
ordered: false,
resultType: emptyResult,
},
"One": {
filters: []bson.D{
{{"v", int32(42)}},
},
},
"Two": {
filters: []bson.D{
{{"v", int32(42)}},
{{"v", int32(0)}},
},
},
}

testDeleteCompat(t, testCases)
Expand Down
2 changes: 2 additions & 0 deletions integration/helpers.go
Expand Up @@ -246,6 +246,8 @@ func AssertEqualAltWriteError(t *testing.T, expected mongo.WriteError, altMessag
}

// UnsetRaw returns error with all Raw fields unset. It returns nil if err is nil.
//
// Error is checked using a regular type assertion; wrapped errors (errors.As) are not checked.
func UnsetRaw(t testing.TB, err error) error {
t.Helper()

Expand Down
22 changes: 10 additions & 12 deletions internal/handlers/common/error.go
Expand Up @@ -121,7 +121,7 @@ func ProtocolError(err error) (ProtoErr, bool) {
panic("err is nil")
}

var e *Error
var e *CommandError
if errors.As(err, &e) {
return e, true
}
Expand All @@ -131,14 +131,11 @@ func ProtocolError(err error) (ProtoErr, bool) {
return writeErr, true
}

return NewError(errInternalError, err).(*Error), false
return NewError(errInternalError, err).(*CommandError), false
}

// CommandError represents wire protocol command error.
type CommandError = Error

// Error is a deprecated name for CommandError; instead, use the later version in the new code.
type Error struct {
type CommandError struct {
err error
code ErrorCode
}
Expand All @@ -153,7 +150,8 @@ func NewError(code ErrorCode, err error) error {
if err == nil {
panic("err is nil")
}
return &Error{

return &CommandError{
code: code,
err: err,
}
Expand All @@ -167,22 +165,22 @@ func NewErrorMsg(code ErrorCode, msg string) error {
}

// Error implements error interface.
func (e *Error) Error() string {
func (e *CommandError) Error() string {
return fmt.Sprintf("%[1]s (%[1]d): %[2]v", e.code, e.err)
}

// Code implements ProtoErr interface.
func (e *Error) Code() ErrorCode {
func (e *CommandError) Code() ErrorCode {
return e.code
}

// Unwrap implements standard error unwrapping interface.
func (e *Error) Unwrap() error {
func (e *CommandError) Unwrap() error {
return e.err
}

// Document returns wire protocol error document.
func (e *Error) Document() *types.Document {
func (e *CommandError) Document() *types.Document {
d := must.NotFail(types.NewDocument(
"ok", float64(0),
"errmsg", e.err.Error(),
Expand Down Expand Up @@ -331,6 +329,6 @@ func formatBitwiseOperatorErr(err error, operator string, maskValue any) error {

// check interfaces
var (
_ ProtoErr = (*Error)(nil)
_ ProtoErr = (*CommandError)(nil)
_ ProtoErr = (*WriteErrors)(nil)
)
60 changes: 28 additions & 32 deletions internal/handlers/pg/msg_delete.go
Expand Up @@ -112,7 +112,8 @@ func (h *Handler) MsgDelete(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
}

resDocs := make([]*types.Document, 0, 16)
err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {

return h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
// fetch current items from collection
fetchedChan, err := h.pgPool.QueryDocuments(ctx, tx, sp)
if err != nil {
Expand Down Expand Up @@ -163,56 +164,51 @@ func (h *Handler) MsgDelete(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,

return nil
})

return err
}

delErrors := new(common.WriteErrors)

var reply wire.OpMsg
var delErrors common.WriteErrors

// process every delete filter
for i := 0; i < deletes.Len(); i++ {
err := processQuery(i)
if err != nil {
switch err.(type) {
switch err.(type) {
case nil:
continue

case *common.CommandError:
// command errors should be return immediately
case *common.CommandError:
return nil, err
return nil, err

default:
// write errors and others require to be handled in array
default:
delErrors.Append(err, int32(i))

// Delete statements in the `deletes` field are not transactional.
// It means that we run each delete statement separately.
// If `ordered` is set as `true`, we don't execute the remaining statements
// after the first failure.
// If `ordered` is set as `false`, we execute all the statements and return
// the list of errors corresponding to the failed statements.
if !ordered {
continue
}
delErrors.Append(err, int32(i))

// Delete statements in the `deletes` field are not transactional.
// It means that we run each delete statement separately.
// If `ordered` is set as `true`, we don't execute the remaining statements
// after the first failure.
// If `ordered` is set as `false`, we execute all the statements and return
// the list of errors corresponding to the failed statements.
if !ordered {
continue
}

// send response if ordered is true
break
}

// send response if ordered is true
break
}

var replyDoc *types.Document
replyDoc := must.NotFail(types.NewDocument(
"ok", float64(1),
))

// if there are delete errors append writeErrors field
if len(*delErrors) > 0 {
if len(delErrors) > 0 {
replyDoc = delErrors.Document()
} else {
replyDoc = must.NotFail(types.NewDocument(
"ok", float64(1),
))
}

must.NoError(replyDoc.Set("n", deleted))

var reply wire.OpMsg
err = reply.SetSections(wire.OpMsgSection{
Documents: []*types.Document{replyDoc},
})
Expand Down
22 changes: 10 additions & 12 deletions internal/handlers/tigris/msg_create.go
Expand Up @@ -81,19 +81,17 @@ func (h *Handler) MsgCreate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
b := must.NotFail(json.Marshal(schema))

created, err := h.db.CreateCollectionIfNotExist(ctx, db, collection, b)
if err != nil {
switch err := err.(type) {
case nil:
// do nothing
case *driver.Error:
if tigrisdb.IsInvalidArgument(err) {
return nil, common.NewError(common.ErrBadValue, err)
}

return nil, lazyerrors.Error(err)
default:
return nil, lazyerrors.Error(err)
switch err := err.(type) {
case nil:
// do nothing
case *driver.Error:
if tigrisdb.IsInvalidArgument(err) {
return nil, common.NewError(common.ErrBadValue, err)
}

return nil, lazyerrors.Error(err)
default:
return nil, lazyerrors.Error(err)
}

if !created {
Expand Down

0 comments on commit cf4aeaa

Please sign in to comment.