Skip to content

Commit

Permalink
Merge branch 'main' into issue-693-new-validation
Browse files Browse the repository at this point in the history
  • Loading branch information
AlekSi committed Oct 17, 2022
2 parents 0fe5803 + c363ece commit e67286e
Show file tree
Hide file tree
Showing 28 changed files with 58 additions and 50 deletions.
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_collstats.go
Expand Up @@ -41,7 +41,7 @@ func (h *Handler) MsgCollStats(ctx context.Context, msg *wire.OpMsg) (*wire.OpMs
return nil, err
}

stats, err := h.pgPool.SchemaStats(ctx, db, collection)
stats, err := h.PgPool.SchemaStats(ctx, db, collection)
if err != nil {
return nil, lazyerrors.Error(err)
}
Expand Down
7 changes: 4 additions & 3 deletions internal/handlers/pg/msg_count.go
Expand Up @@ -42,12 +42,13 @@ func (h *Handler) MsgCount(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, e
if err := common.Unimplemented(document, unimplementedFields...); err != nil {
return nil, err
}

ignoredFields := []string{
"hint",
"readConcern",
"comment",
}
common.Ignored(document, h.l, ignoredFields...)
common.Ignored(document, h.L, ignoredFields...)

var filter *types.Document
if filter, err = common.GetOptionalParam(document, "query", filter); err != nil {
Expand Down Expand Up @@ -83,8 +84,8 @@ func (h *Handler) MsgCount(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, e
sp.Filter = filter

resDocs := make([]*types.Document, 0, 16)
err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
fetchedChan, err := h.pgPool.QueryDocuments(ctx, tx, &sp)
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
fetchedChan, err := h.PgPool.QueryDocuments(ctx, tx, &sp)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions internal/handlers/pg/msg_create.go
Expand Up @@ -52,14 +52,15 @@ func (h *Handler) MsgCreate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
if err := common.Unimplemented(document, unimplementedFields...); err != nil {
return nil, err
}

ignoredFields := []string{
"autoIndexId",
"storageEngine",
"indexOptionDefaults",
"writeConcern",
"comment",
}
common.Ignored(document, h.l, ignoredFields...)
common.Ignored(document, h.L, ignoredFields...)

command := document.Command()

Expand All @@ -71,7 +72,7 @@ func (h *Handler) MsgCreate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return nil, err
}

err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
if err := pgdb.CreateDatabaseIfNotExists(ctx, tx, db); err != nil {
if errors.Is(pgdb.ErrInvalidDatabaseName, err) {
msg := fmt.Sprintf("Invalid namespace: %s.%s", db, collection)
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_createindexes.go
Expand Up @@ -33,7 +33,7 @@ func (h *Handler) MsgCreateIndexes(ctx context.Context, msg *wire.OpMsg) (*wire.
return nil, err
}

common.Ignored(document, h.l, "writeConcern", "commitQuorum", "comment")
common.Ignored(document, h.L, "writeConcern", "commitQuorum", "comment")

var reply wire.OpMsg
err = reply.SetSections(wire.OpMsgSection{
Expand Down
5 changes: 3 additions & 2 deletions internal/handlers/pg/msg_datasize.go
Expand Up @@ -40,7 +40,8 @@ func (h *Handler) MsgDataSize(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg
if err := common.Unimplemented(document, "keyPattern", "min", "max"); err != nil {
return nil, err
}
common.Ignored(document, h.l, "estimate")

common.Ignored(document, h.L, "estimate")

m := document.Map()
target, ok := m["dataSize"].(string)
Expand All @@ -54,7 +55,7 @@ func (h *Handler) MsgDataSize(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg
db, collection := targets[0], targets[1]

started := time.Now()
stats, err := h.pgPool.SchemaStats(ctx, db, collection)
stats, err := h.PgPool.SchemaStats(ctx, db, collection)
elapses := time.Since(started)

addEstimate := true
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_dbstats.go
Expand Up @@ -42,7 +42,7 @@ func (h *Handler) MsgDBStats(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
scale = 1
}

stats, err := h.pgPool.SchemaStats(ctx, db, "")
stats, err := h.PgPool.SchemaStats(ctx, db, "")
if err != nil {
return nil, lazyerrors.Error(err)
}
Expand Down
9 changes: 5 additions & 4 deletions internal/handlers/pg/msg_delete.go
Expand Up @@ -38,7 +38,8 @@ func (h *Handler) MsgDelete(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
if err := common.Unimplemented(document, "let"); err != nil {
return nil, err
}
common.Ignored(document, h.l, "writeConcern")

common.Ignored(document, h.L, "writeConcern")

var deletes *types.Array
if deletes, err = common.GetOptionalParam(document, "deletes", deletes); err != nil {
Expand Down Expand Up @@ -173,9 +174,9 @@ func (h *Handler) execDelete(ctx context.Context, sp *pgdb.SQLParam, filter *typ
resDocs := make([]*types.Document, 0, 16)

var deleted int32
err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
// fetch current items from collection
fetchedChan, err := h.pgPool.QueryDocuments(ctx, tx, sp)
fetchedChan, err := h.PgPool.QueryDocuments(ctx, tx, sp)
if err != nil {
return err
}
Expand Down Expand Up @@ -241,7 +242,7 @@ func (h *Handler) delete(ctx context.Context, sp *pgdb.SQLParam, docs []*types.D
}

var rowsDeleted int64
err := h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err := h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
var err error
rowsDeleted, err = pgdb.DeleteDocumentsByID(ctx, tx, sp, ids)
return err
Expand Down
4 changes: 2 additions & 2 deletions internal/handlers/pg/msg_drop.go
Expand Up @@ -35,7 +35,7 @@ func (h *Handler) MsgDrop(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, er
return nil, lazyerrors.Error(err)
}

common.Ignored(document, h.l, "writeConcern", "comment")
common.Ignored(document, h.L, "writeConcern", "comment")

command := document.Command()

Expand All @@ -47,7 +47,7 @@ func (h *Handler) MsgDrop(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, er
return nil, err
}

err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
return pgdb.DropCollection(ctx, tx, db, collection)
})

Expand Down
4 changes: 2 additions & 2 deletions internal/handlers/pg/msg_dropdatabase.go
Expand Up @@ -35,7 +35,7 @@ func (h *Handler) MsgDropDatabase(ctx context.Context, msg *wire.OpMsg) (*wire.O
return nil, lazyerrors.Error(err)
}

common.Ignored(document, h.l, "writeConcern", "comment")
common.Ignored(document, h.L, "writeConcern", "comment")

var db string
if db, err = common.GetRequiredParam[string](document, "$db"); err != nil {
Expand All @@ -44,7 +44,7 @@ func (h *Handler) MsgDropDatabase(ctx context.Context, msg *wire.OpMsg) (*wire.O

res := must.NotFail(types.NewDocument())

err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
return pgdb.DropDatabase(ctx, tx, db)
})

Expand Down
4 changes: 2 additions & 2 deletions internal/handlers/pg/msg_explain.go
Expand Up @@ -43,7 +43,7 @@ func (h *Handler) MsgExplain(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return nil, lazyerrors.Error(err)
}

common.Ignored(document, h.l, "verbosity")
common.Ignored(document, h.L, "verbosity")

command, err := common.GetRequiredParam[*types.Document](document, document.Command())
if err != nil {
Expand All @@ -67,7 +67,7 @@ func (h *Handler) MsgExplain(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
}

var queryPlanner *types.Array
err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
var err error
queryPlanner, err = pgdb.Explain(ctx, tx, sp)
return err
Expand Down
7 changes: 4 additions & 3 deletions internal/handlers/pg/msg_find.go
Expand Up @@ -52,6 +52,7 @@ func (h *Handler) MsgFind(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, er
if err := common.Unimplemented(document, unimplementedFields...); err != nil {
return nil, err
}

ignoredFields := []string{
"hint",
"batchSize",
Expand All @@ -60,7 +61,7 @@ func (h *Handler) MsgFind(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, er
"max",
"min",
}
common.Ignored(document, h.l, ignoredFields...)
common.Ignored(document, h.L, ignoredFields...)

var filter, sort, projection *types.Document
if filter, err = common.GetOptionalParam(document, "filter", filter); err != nil {
Expand Down Expand Up @@ -125,8 +126,8 @@ func (h *Handler) MsgFind(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, er
}

resDocs := make([]*types.Document, 0, 16)
err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
fetchedChan, err := h.pgPool.QueryDocuments(ctx, tx, &sp)
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
fetchedChan, err := h.PgPool.QueryDocuments(ctx, tx, &sp)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/handlers/pg/msg_findandmodify.go
Expand Up @@ -50,7 +50,7 @@ func (h *Handler) MsgFindAndModify(ctx context.Context, msg *wire.OpMsg) (*wire.
"collation",
"hint",
}
common.Ignored(document, h.l, ignoredFields...)
common.Ignored(document, h.L, ignoredFields...)

params, err := common.PrepareFindAndModifyParams(document)
if err != nil {
Expand All @@ -74,10 +74,10 @@ func (h *Handler) MsgFindAndModify(ctx context.Context, msg *wire.OpMsg) (*wire.
// This is not very optimal as we need to fetch everything from the database to have a proper sort.
// We might consider rewriting it later.
var reply wire.OpMsg
err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
resDocs := make([]*types.Document, 0, 16)

fetchedChan, err := h.pgPool.QueryDocuments(ctx, tx, &sqlParam)
fetchedChan, err := h.PgPool.QueryDocuments(ctx, tx, &sqlParam)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_getlog.go
Expand Up @@ -84,7 +84,7 @@ func (h *Handler) MsgGetLog(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,

case "startupWarnings":
var pv string
err = h.pgPool.QueryRow(ctx, "SHOW server_version").Scan(&pv)
err = h.PgPool.QueryRow(ctx, "SHOW server_version").Scan(&pv)
if err != nil {
return nil, lazyerrors.Error(err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_getparameter.go
Expand Up @@ -79,7 +79,7 @@ func (h *Handler) MsgGetParameter(ctx context.Context, msg *wire.OpMsg) (*wire.O
return nil, lazyerrors.Error(err)
}

common.Ignored(document, h.l, "comment")
common.Ignored(document, h.L, "comment")

if resDoc.Len() < 2 {
return &reply, common.NewErrorMsg(common.ErrorCode(0), "no option found to get")
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_hello.go
Expand Up @@ -26,7 +26,7 @@ import (

// MsgHello implements HandlerInterface.
func (h *Handler) MsgHello(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, error) {
if err := h.pgPool.Ping(ctx); err != nil {
if err := h.PgPool.Ping(ctx); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions internal/handlers/pg/msg_insert.go
Expand Up @@ -36,7 +36,7 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return nil, lazyerrors.Error(err)
}

common.Ignored(document, h.l, "ordered", "writeConcern", "bypassDocumentValidation", "comment")
common.Ignored(document, h.L, "ordered", "writeConcern", "bypassDocumentValidation", "comment")

var sp pgdb.SQLParam

Expand All @@ -63,7 +63,7 @@ func (h *Handler) MsgInsert(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
}

var inserted int32
err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
for i := 0; i < docs.Len(); i++ {
doc, err := docs.Get(i)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_ismaster.go
Expand Up @@ -26,7 +26,7 @@ import (

// MsgIsMaster implements HandlerInterface.
func (h *Handler) MsgIsMaster(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, error) {
if err := h.pgPool.Ping(ctx); err != nil {
if err := h.PgPool.Ping(ctx); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions internal/handlers/pg/msg_listcollections.go
Expand Up @@ -50,7 +50,7 @@ func (h *Handler) MsgListCollections(ctx context.Context, msg *wire.OpMsg) (*wir
// return nil, err
// }

common.Ignored(document, h.l, "comment", "authorizedCollections")
common.Ignored(document, h.L, "comment", "authorizedCollections")

var db string
if db, err = common.GetRequiredParam[string](document, "$db"); err != nil {
Expand All @@ -59,7 +59,7 @@ func (h *Handler) MsgListCollections(ctx context.Context, msg *wire.OpMsg) (*wir

var names []string

err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
var err error

names, err = pgdb.Collections(ctx, tx, db)
Expand Down
6 changes: 3 additions & 3 deletions internal/handlers/pg/msg_listdatabases.go
Expand Up @@ -39,7 +39,7 @@ func (h *Handler) MsgListDatabases(ctx context.Context, msg *wire.OpMsg) (*wire.
return nil, err
}

common.Ignored(document, h.l, "comment", "authorizedDatabases")
common.Ignored(document, h.L, "comment", "authorizedDatabases")

nameOnly, err := common.GetBoolOptionalParam(document, "nameOnly")
if err != nil {
Expand All @@ -48,7 +48,7 @@ func (h *Handler) MsgListDatabases(ctx context.Context, msg *wire.OpMsg) (*wire.

var totalSize int64
var databases *types.Array
err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
var databaseNames []string
var err error
databaseNames, err = pgdb.Databases(ctx, tx)
Expand Down Expand Up @@ -106,7 +106,7 @@ func (h *Handler) MsgListDatabases(ctx context.Context, msg *wire.OpMsg) (*wire.
return nil
}

return h.pgPool.QueryRow(ctx, "SELECT pg_database_size(current_database())").Scan(&totalSize)
return h.PgPool.QueryRow(ctx, "SELECT pg_database_size(current_database())").Scan(&totalSize)
})
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_ping.go
Expand Up @@ -25,7 +25,7 @@ import (

// MsgPing implements HandlerInterface.
func (h *Handler) MsgPing(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, error) {
if err := h.pgPool.Ping(ctx); err != nil {
if err := h.PgPool.Ping(ctx); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_serverstatus.go
Expand Up @@ -31,7 +31,7 @@ func (h *Handler) MsgServerStatus(ctx context.Context, msg *wire.OpMsg) (*wire.O
return nil, lazyerrors.Error(err)
}

stats, err := h.pgPool.SchemaStats(ctx, "", "")
stats, err := h.PgPool.SchemaStats(ctx, "", "")
if err != nil {
return nil, lazyerrors.Error(err)
}
Expand Down
11 changes: 6 additions & 5 deletions internal/handlers/pg/msg_update.go
Expand Up @@ -40,7 +40,8 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
if err := common.Unimplemented(document, "let"); err != nil {
return nil, err
}
common.Ignored(document, h.l, "ordered", "writeConcern", "bypassDocumentValidation")

common.Ignored(document, h.L, "ordered", "writeConcern", "bypassDocumentValidation")

var sp pgdb.SQLParam

Expand All @@ -67,7 +68,7 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
}

var created bool
err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
created, err = pgdb.CreateCollectionIfNotExist(ctx, tx, sp.DB, sp.Collection)
return err
})
Expand All @@ -80,13 +81,13 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return nil, err
}
if created {
h.l.Info("Created table.", zap.String("schema", sp.DB), zap.String("table", sp.Collection))
h.L.Info("Created table.", zap.String("schema", sp.DB), zap.String("table", sp.Collection))
}

var matched, modified int32
var upserted types.Array

err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
for i := 0; i < updates.Len(); i++ {
update, err := common.AssertType[*types.Document](must.NotFail(updates.Get(i)))
if err != nil {
Expand Down Expand Up @@ -141,7 +142,7 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
sp.Filter = q

resDocs := make([]*types.Document, 0, 16)
fetchedChan, err := h.pgPool.QueryDocuments(ctx, tx, &sp)
fetchedChan, err := h.PgPool.QueryDocuments(ctx, tx, &sp)
if err != nil {
return err
}
Expand Down

0 comments on commit e67286e

Please sign in to comment.