Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace pgxtype.Querier with pgx.Tx #1188

Merged
merged 8 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/alecthomas/kong v0.6.1
github.com/jackc/pgconn v1.13.0
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pgtype v1.12.0
github.com/jackc/pgx/v4 v4.17.2
github.com/pmezard/go-difflib v1.0.0
github.com/prometheus/client_golang v1.13.0
Expand Down Expand Up @@ -42,6 +41,7 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.12.0 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/msg_listcollections.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (h *Handler) MsgListCollections(ctx context.Context, msg *wire.OpMsg) (*wir
err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
var err error

names, err = pgdb.Collections(ctx, h.pgPool, db)
names, err = pgdb.Collections(ctx, tx, db)
if err != nil && !errors.Is(err, pgdb.ErrSchemaNotExist) {
return lazyerrors.Error(err)
}
Expand Down
33 changes: 16 additions & 17 deletions internal/handlers/pg/pgdb/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgtype/pgxtype"
"github.com/jackc/pgx/v4"
"golang.org/x/exp/slices"

Expand All @@ -37,8 +36,8 @@ var validateCollectionNameRe = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]{0,119}
// Collections returns a sorted list of FerretDB collection names.
//
// It returns (possibly wrapped) ErrSchemaNotExist if FerretDB database / PostgreSQL schema does not exist.
func Collections(ctx context.Context, querier pgxtype.Querier, db string) ([]string, error) {
schemaExists, err := schemaExists(ctx, querier, db)
func Collections(ctx context.Context, tx pgx.Tx, db string) ([]string, error) {
schemaExists, err := schemaExists(ctx, tx, db)
if err != nil {
return nil, lazyerrors.Error(err)
}
Expand All @@ -47,7 +46,7 @@ func Collections(ctx context.Context, querier pgxtype.Querier, db string) ([]str
return nil, ErrSchemaNotExist
}

settings, err := getSettingsTable(ctx, querier, db)
settings, err := getSettingsTable(ctx, tx, db)
if err != nil {
return nil, lazyerrors.Error(err)
}
Expand All @@ -67,8 +66,8 @@ func Collections(ctx context.Context, querier pgxtype.Querier, db string) ([]str
}

// CollectionExists returns true if FerretDB collection exists.
func CollectionExists(ctx context.Context, querier pgxtype.Querier, db, collection string) (bool, error) {
collections, err := Collections(ctx, querier, db)
func CollectionExists(ctx context.Context, tx pgx.Tx, db, collection string) (bool, error) {
collections, err := Collections(ctx, tx, db)
if err != nil {
if errors.Is(err, ErrSchemaNotExist) {
return false, nil
Expand All @@ -87,13 +86,13 @@ func CollectionExists(ctx context.Context, querier pgxtype.Querier, db, collecti
// - ErrTableNotExist - is the required FerretDB database does not exist.
//
// Please use errors.Is to check the error.
func CreateCollection(ctx context.Context, querier pgxtype.Querier, db, collection string) error {
func CreateCollection(ctx context.Context, tx pgx.Tx, db, collection string) error {
if !validateCollectionNameRe.MatchString(collection) ||
strings.HasPrefix(collection, reservedPrefix) {
return ErrInvalidTableName
}

schemaExists, err := schemaExists(ctx, querier, db)
schemaExists, err := schemaExists(ctx, tx, db)
if err != nil {
return lazyerrors.Error(err)
}
Expand All @@ -103,15 +102,15 @@ func CreateCollection(ctx context.Context, querier pgxtype.Querier, db, collecti
}

table := formatCollectionName(collection)
tables, err := tables(ctx, querier, db)
tables, err := tables(ctx, tx, db)
if err != nil {
return err
}
if slices.Contains(tables, table) {
return ErrAlreadyExist
}

settings, err := getSettingsTable(ctx, querier, db)
settings, err := getSettingsTable(ctx, tx, db)
if err != nil {
return lazyerrors.Error(err)
}
Expand All @@ -130,13 +129,13 @@ func CreateCollection(ctx context.Context, querier pgxtype.Querier, db, collecti
must.NoError(collections.Set(collection, table))
must.NoError(settings.Set("collections", collections))

err = updateSettingsTable(ctx, querier, db, settings)
err = updateSettingsTable(ctx, tx, db, settings)
if err != nil {
return lazyerrors.Error(err)
}

sql := `CREATE TABLE IF NOT EXISTS ` + pgx.Identifier{db, table}.Sanitize() + ` (_jsonb jsonb)`
if _, err = querier.Exec(ctx, sql); err == nil {
if _, err = tx.Exec(ctx, sql); err == nil {
return nil
}

Expand Down Expand Up @@ -192,8 +191,8 @@ func CreateCollectionIfNotExist(ctx context.Context, tx pgx.Tx, db, collection s
//
// It returns (possibly wrapped) ErrTableNotExist if database or collection does not exist.
// Please use errors.Is to check the error.
func DropCollection(ctx context.Context, querier pgxtype.Querier, db, collection string) error {
schemaExists, err := schemaExists(ctx, querier, db)
func DropCollection(ctx context.Context, tx pgx.Tx, db, collection string) error {
schemaExists, err := schemaExists(ctx, tx, db)
if err != nil {
return lazyerrors.Error(err)
}
Expand All @@ -203,15 +202,15 @@ func DropCollection(ctx context.Context, querier pgxtype.Querier, db, collection
}

table := formatCollectionName(collection)
tables, err := tables(ctx, querier, db)
tables, err := tables(ctx, tx, db)
if err != nil {
return lazyerrors.Error(err)
}
if !slices.Contains(tables, table) {
return ErrTableNotExist
}

err = removeTableFromSettings(ctx, querier, db, collection)
err = removeTableFromSettings(ctx, tx, db, collection)
if err != nil && !errors.Is(err, ErrTableNotExist) {
return lazyerrors.Error(err)
}
Expand All @@ -221,7 +220,7 @@ func DropCollection(ctx context.Context, querier pgxtype.Querier, db, collection

// TODO https://github.com/FerretDB/FerretDB/issues/811
sql := `DROP TABLE IF EXISTS ` + pgx.Identifier{db, table}.Sanitize() + ` CASCADE`
_, err = querier.Exec(ctx, sql)
_, err = tx.Exec(ctx, sql)
if err != nil {
return lazyerrors.Error(err)
}
Expand Down
74 changes: 57 additions & 17 deletions internal/handlers/pg/pgdb/pgdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,27 +90,39 @@ func TestCreateDrop(t *testing.T) {
collectionName := testutil.CollectionName(t)
setupDatabase(ctx, t, pool, databaseName)

err := DropCollection(ctx, pool, databaseName, collectionName)
require.Equal(t, ErrSchemaNotExist, err)
err := pool.InTransaction(ctx, func(tx pgx.Tx) error {
return DropCollection(ctx, tx, databaseName, collectionName)
})
require.ErrorIs(t, err, ErrSchemaNotExist)

err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
return DropDatabase(ctx, tx, databaseName)
})
require.ErrorIs(t, err, ErrSchemaNotExist)

err = CreateCollection(ctx, pool, databaseName, collectionName)
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
return CreateCollection(ctx, tx, databaseName, collectionName)
})
require.ErrorIs(t, err, ErrSchemaNotExist)

err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
return CreateDatabaseIfNotExists(ctx, tx, databaseName)
})
require.NoError(t, err)

exists, err := CollectionExists(ctx, pool, databaseName, collectionName)
var exists bool
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
exists, err = CollectionExists(ctx, tx, databaseName, collectionName)
return err
})
require.NoError(t, err)
assert.False(t, exists)

collections, err := Collections(ctx, pool, databaseName)
var collections []string
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
collections, err = Collections(ctx, tx, databaseName)
return err
})
require.NoError(t, err)
assert.Empty(t, collections)
})
Expand All @@ -132,22 +144,36 @@ func TestCreateDrop(t *testing.T) {
})
require.NoError(t, err)

exists, err := CollectionExists(ctx, pool, databaseName, collectionName)
var exists bool
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
exists, err = CollectionExists(ctx, tx, databaseName, collectionName)
return err
})
require.NoError(t, err)
assert.False(t, exists)

err = DropCollection(ctx, pool, databaseName, collectionName)
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
return DropCollection(ctx, tx, databaseName, collectionName)
})
require.ErrorIs(t, err, ErrTableNotExist)

err = CreateCollection(ctx, pool, databaseName, collectionName)
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
return CreateCollection(ctx, tx, databaseName, collectionName)
})
require.NoError(t, err)

exists, err = CollectionExists(ctx, pool, databaseName, collectionName)
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
exists, err = CollectionExists(ctx, tx, databaseName, collectionName)
return err
})
require.NoError(t, err)
assert.True(t, exists)

collections, err := Collections(ctx, pool, databaseName)
require.NoError(t, err)
var collections []string
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
collections, err = Collections(ctx, tx, databaseName)
return err
})
assert.Equal(t, []string{collectionName}, collections)

err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
Expand All @@ -173,20 +199,32 @@ func TestCreateDrop(t *testing.T) {
})
require.NoError(t, err)

err = CreateCollection(ctx, pool, databaseName, collectionName)
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
return CreateCollection(ctx, tx, databaseName, collectionName)
})
require.NoError(t, err)

collections, err := Collections(ctx, pool, databaseName)
var collections []string
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
collections, err = Collections(ctx, tx, databaseName)
return err
})
require.NoError(t, err)
assert.Equal(t, []string{collectionName}, collections)

err = CreateCollection(ctx, pool, databaseName, collectionName)
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
return CreateCollection(ctx, tx, databaseName, collectionName)
})
require.ErrorIs(t, err, ErrAlreadyExist)

err = DropCollection(ctx, pool, databaseName, collectionName)
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
return DropCollection(ctx, tx, databaseName, collectionName)
})
require.NoError(t, err)

err = DropCollection(ctx, pool, databaseName, collectionName)
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
return DropCollection(ctx, tx, databaseName, collectionName)
})
require.ErrorIs(t, err, ErrTableNotExist)
})
}
Expand Down Expand Up @@ -247,7 +285,9 @@ func TestCreateCollectionIfNotExist(t *testing.T) {
})
require.NoError(t, err)

err = CreateCollection(ctx, pool, databaseName, collectionName)
err = pool.InTransaction(ctx, func(tx pgx.Tx) error {
return CreateCollection(ctx, tx, databaseName, collectionName)
})
require.NoError(t, err)

var created bool
Expand Down
13 changes: 6 additions & 7 deletions internal/handlers/pg/pgdb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"strings"

"github.com/jackc/pgtype/pgxtype"
"github.com/jackc/pgx/v4"
"golang.org/x/exp/maps"

Expand Down Expand Up @@ -107,13 +106,13 @@ func (pgPool *Pool) QueryDocuments(ctx context.Context, tx pgx.Tx, sp *SQLParam)
}

// Explain returns SQL EXPLAIN results for given query parameters.
func Explain(ctx context.Context, querier pgxtype.Querier, sp SQLParam) (*types.Array, error) {
q, err := buildQuery(ctx, querier, &sp)
func Explain(ctx context.Context, tx pgx.Tx, sp SQLParam) (*types.Array, error) {
q, err := buildQuery(ctx, tx, &sp)
if err != nil {
return nil, lazyerrors.Error(err)
}

rows, err := querier.Query(ctx, q)
rows, err := tx.Query(ctx, q)
if err != nil {
return nil, lazyerrors.Error(err)
}
Expand Down Expand Up @@ -150,16 +149,16 @@ func Explain(ctx context.Context, querier pgxtype.Querier, sp SQLParam) (*types.
//
// It returns (possibly wrapped) ErrSchemaNotExist or ErrTableNotExist
// if schema/database or table/collection does not exist.
func buildQuery(ctx context.Context, querier pgxtype.Querier, sp *SQLParam) (string, error) {
exists, err := CollectionExists(ctx, querier, sp.DB, sp.Collection)
func buildQuery(ctx context.Context, tx pgx.Tx, sp *SQLParam) (string, error) {
exists, err := CollectionExists(ctx, tx, sp.DB, sp.Collection)
if err != nil {
return "", lazyerrors.Error(err)
}
if !exists {
return "", lazyerrors.Error(ErrTableNotExist)
}

table, err := getTableName(ctx, querier, sp.DB, sp.Collection)
table, err := getTableName(ctx, tx, sp.DB, sp.Collection)
if err != nil {
return "", lazyerrors.Error(err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/handlers/pg/pgdb/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ package pgdb
import (
"context"

"github.com/jackc/pgtype/pgxtype"
"github.com/jackc/pgx/v4"

"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
)

// schemaExists returns true if given schema exists.
func schemaExists(ctx context.Context, querier pgxtype.Querier, db string) (bool, error) {
func schemaExists(ctx context.Context, tx pgx.Tx, db string) (bool, error) {
sql := `SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname = $1`
rows, err := querier.Query(ctx, sql, db)
rows, err := tx.Query(ctx, sql, db)
if err != nil {
return false, lazyerrors.Error(err)
}
Expand Down