Skip to content

Commit

Permalink
Make updates work (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlekSi committed Mar 23, 2022
1 parent 8219693 commit 0ec1502
Showing 1 changed file with 130 additions and 145 deletions.
275 changes: 130 additions & 145 deletions internal/handlers/jsonb1/msg_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,154 +18,139 @@ import (
"context"
"fmt"

"github.com/jackc/pgx/v4"

"github.com/FerretDB/FerretDB/internal/fjson"
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
)

// MsgUpdate modifies an existing document or documents in a collection.
func (s *storage) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, error) {
return nil, fmt.Errorf("TODO")

// document, err := msg.Document()
// if err != nil {
// return nil, lazyerrors.Error(err)
// }

// if err := common.Unimplemented(document, "let"); err != nil {
// return nil, err
// }
// common.Ignored(document, s.l, "ordered", "writeConcern", "bypassDocumentValidation", "comment")

// command := document.Command()

// var db, collection string
// if db, err = common.GetRequiredParam[string](document, "$db"); err != nil {
// return nil, err
// }
// if collection, err = common.GetRequiredParam[string](document, command); err != nil {
// return nil, err
// }

// m := document.Map()
// docs, _ := m["updates"].(*types.Array)

// var selected, updated int32
// for i := 0; i < docs.Len(); i++ {
// doc, err := docs.Get(i)
// if err != nil {
// return nil, lazyerrors.Error(err)
// }

// unimplementedFields := []string{
// "c",
// "upsert",
// "multi",
// "collation",
// "arrayFilters",
// "hint",
// }
// if err := common.Unimplemented(doc.(*types.Document), unimplementedFields...); err != nil {
// return nil, err
// }

// docM := doc.(*types.Document).Map()

// sql := fmt.Sprintf(`SELECT _jsonb FROM %s`, pgx.Identifier{db, collection}.Sanitize())
// var placeholder pg.Placeholder

// whereSQL, args, err := where(docM["q"].(*types.Document), &placeholder)
// if err != nil {
// return nil, lazyerrors.Error(err)
// }

// sql += whereSQL

// rows, err := s.pgPool.Query(ctx, sql, args...)
// if err != nil {
// return nil, err
// }
// defer rows.Close()

// var updateDocs types.Array

// for {
// updateDoc, err := nextRow(rows)
// if err != nil {
// return nil, err
// }
// if updateDoc == nil {
// break
// }

// if err = updateDocs.Append(updateDoc); err != nil {
// return nil, lazyerrors.Error(err)
// }
// }

// selected += int32(updateDocs.Len())

// for i := 0; i < updateDocs.Len(); i++ {
// updateDoc, err := updateDocs.Get(i)
// if err != nil {
// return nil, lazyerrors.Error(err)
// }

// d := updateDoc.(*types.Document)

// for updateOp, updateV := range docM["u"].(*types.Document).Map() {
// switch updateOp {
// case "$set":
// for k, v := range updateV.(*types.Document).Map() {
// if err := d.Set(k, v); err != nil {
// return nil, lazyerrors.Error(err)
// }
// }
// default:
// return nil, lazyerrors.Errorf("unhandled operation %q", updateOp)
// }
// }

// if err = updateDocs.Set(i, d); err != nil {
// return nil, lazyerrors.Error(err)
// }
// }

// for i := 0; i < updateDocs.Len(); i++ {
// updateDoc, err := updateDocs.Get(i)
// if err != nil {
// return nil, lazyerrors.Error(err)
// }

// sql = fmt.Sprintf("UPDATE %s SET _jsonb = $1 WHERE _jsonb->'_id' = $2", pgx.Identifier{db, collection}.Sanitize())
// d := updateDoc.(*types.Document)
// db, err := fjson.Marshal(d)
// if err != nil {
// return nil, err
// }

// idb, err := fjson.Marshal(d.Map()["_id"].(types.ObjectID))
// if err != nil {
// return nil, err
// }
// tag, err := s.pgPool.Exec(ctx, sql, db, idb)
// if err != nil {
// return nil, err
// }

// updated += int32(tag.RowsAffected())
// }
// }

// var reply wire.OpMsg
// err = reply.SetSections(wire.OpMsgSection{
// Documents: []*types.Document{types.MustNewDocument(
// "n", selected,
// "nModified", updated,
// "ok", float64(1),
// )},
// })
// if err != nil {
// return nil, lazyerrors.Error(err)
// }

// return &reply, nil
document, err := msg.Document()
if err != nil {
return nil, lazyerrors.Error(err)
}

if err := common.Unimplemented(document, "let"); err != nil {
return nil, err
}
common.Ignored(document, s.l, "ordered", "writeConcern", "bypassDocumentValidation", "comment")

command := document.Command()

var db, collection string
if db, err = common.GetRequiredParam[string](document, "$db"); err != nil {
return nil, err
}
if collection, err = common.GetRequiredParam[string](document, command); err != nil {
return nil, err
}

var updates *types.Array
if updates, err = common.GetOptionalParam(document, "updates", updates); err != nil {
return nil, err
}

var selected, updated int32
for i := 0; i < updates.Len(); i++ {
update, err := common.AssertType[*types.Document](must.NotFail(updates.Get(i)))
if err != nil {
return nil, err
}

unimplementedFields := []string{
"c",
"upsert",
"multi",
"collation",
"arrayFilters",
"hint",
}
if err := common.Unimplemented(update, unimplementedFields...); err != nil {
return nil, err
}

var q, u *types.Document
if q, err = common.GetOptionalParam(update, "q", q); err != nil {
return nil, err
}
if u, err = common.GetOptionalParam(update, "u", u); err != nil {
return nil, err
}

fetchedDocs, err := s.fetch(ctx, db, collection)
if err != nil {
return nil, err
}

resDocs := make([]*types.Document, 0, 16)
for _, doc := range fetchedDocs {
matches, err := common.FilterDocument(doc, q)
if err != nil {
return nil, err
}

if !matches {
continue
}

resDocs = append(resDocs, doc)
}

if len(resDocs) == 0 {
continue
}

selected += int32(len(resDocs))

for _, doc := range resDocs {
for _, updateOp := range u.Keys() {
updateV := must.NotFail(u.Get(updateOp))
switch updateOp {
case "$set":
setDoc, err := common.AssertType[*types.Document](updateV)
if err != nil {
return nil, err
}

for _, setKey := range setDoc.Keys() {
setValue := must.NotFail(setDoc.Get(setKey))
if err = doc.Set(setKey, setValue); err != nil {
return nil, lazyerrors.Error(err)
}
}

default:
return nil, lazyerrors.Errorf("unhandled operation %q", updateOp)
}
}

sql := fmt.Sprintf("UPDATE %s SET _jsonb = $1 WHERE _jsonb->'_id' = $2", pgx.Identifier{db, collection}.Sanitize())
id := must.NotFail(doc.Get("_id"))
tag, err := s.pgPool.Exec(ctx, sql, must.NotFail(fjson.Marshal(doc)), must.NotFail(fjson.Marshal(id)))
if err != nil {
return nil, err
}

updated += int32(tag.RowsAffected())
}
}

var reply wire.OpMsg
err = reply.SetSections(wire.OpMsgSection{
Documents: []*types.Document{types.MustNewDocument(
"n", selected,
"nModified", updated,
"ok", float64(1),
)},
})
if err != nil {
return nil, lazyerrors.Error(err)
}

return &reply, nil
}

0 comments on commit 0ec1502

Please sign in to comment.