Skip to content

Commit

Permalink
Add stress tests for settings table and fix simple issues with transa…
Browse files Browse the repository at this point in the history
…ctions (#1316)

Closes #1282.
  • Loading branch information
rumyantseva committed Oct 31, 2022
1 parent b5b52cf commit f4974a5
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 17 deletions.
100 changes: 100 additions & 0 deletions integration/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
package integration

import (
"errors"
"fmt"
"runtime"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
Expand All @@ -26,6 +31,101 @@ import (
"github.com/FerretDB/FerretDB/integration/setup"
)

func TestCreateStress(t *testing.T) {
setup.SkipForPostgresWithReason(t, "https://github.com/FerretDB/FerretDB/issues/1206")

t.Parallel()

ctx, collection := setup.Setup(t) // no providers there, we will create collections concurrently
db := collection.Database()

collNum := runtime.GOMAXPROCS(-1) * 10

ready := make(chan struct{}, collNum)
start := make(chan struct{})

var wg sync.WaitGroup
for i := 0; i < collNum; i++ {
wg.Add(1)

go func(i int) {
defer wg.Done()

ready <- struct{}{}

<-start

collName := fmt.Sprintf("stress_%d", i)

schema := fmt.Sprintf(`{
"title": "%s",
"description": "Create Collection Stress %d",
"primary_key": ["_id"],
"properties": {
"_id": {"type": "string"},
"v": {"type": "string"}
}
}`, collName, i,
)
opts := options.CreateCollectionOptions{
Validator: bson.D{{"$tigrisSchemaString", schema}},
}

// Attempt to create a collection for Tigris with a schema.
// If we get an error, that's MongoDB (FerretDB ignores that argument for non-Tigris handlers),
// so we create collection without schema.
err := db.CreateCollection(ctx, collName, &opts)
if err != nil {
var cmdErr *mongo.CommandError
if errors.As(err, &cmdErr) {
if strings.Contains(cmdErr.Message, `unknown top level operator: $tigrisSchemaString`) {
err = db.CreateCollection(ctx, collName)
}
}

assert.NoError(t, err)
}

_, err = db.Collection(collName).InsertOne(ctx, bson.D{{"_id", "foo"}, {"v", "bar"}})

assert.NoError(t, err)
}(i)
}

for i := 0; i < collNum; i++ {
<-ready
}

close(start)

wg.Wait()

colls, err := db.ListCollectionNames(ctx, bson.D{})
require.NoError(t, err)

// TODO https://github.com/FerretDB/FerretDB/issues/1206
// Without SkipForPostgres this test would fail.
// Even though all the collections are created as separate tables in the database,
// the settings table doesn't store all of them because of concurrency issues.
require.Len(t, colls, collNum)

// check that all collections were created, and we can query them
for i := 0; i < collNum; i++ {
i := i

t.Run(fmt.Sprintf("check_stress_%d", i), func(t *testing.T) {
t.Parallel()

collName := fmt.Sprintf("stress_%d", i)

var doc bson.D
err := db.Collection(collName).FindOne(ctx, bson.D{{"_id", "foo"}}).Decode(&doc)
require.NoError(t, err)
require.Equal(t, bson.D{{"_id", "foo"}, {"v", "bar"}}, doc)
})
}
}

func TestCreateTigris(t *testing.T) {
setup.SkipForPostgresWithReason(t, "Tigris-specific schema is used")

Expand Down
31 changes: 25 additions & 6 deletions internal/handlers/pg/msg_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,44 @@ func (h *Handler) MsgCreate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return nil, err
}

// We use two separate transactions as there is a case when a query of the first transaction could fail,
// and we should consider it normal: it could happen if we attempt to create databases from two parallel requests.
// One of such requests will fail as the database was already created from another request,
// but it's a normal situation, and we should create both collections in such a case.

err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
if err := pgdb.CreateDatabaseIfNotExists(ctx, tx, db); err != nil {
if errors.Is(pgdb.ErrInvalidDatabaseName, err) {
switch {
case errors.Is(err, pgdb.ErrAlreadyExist):
// If the DB was created from a parallel query, it's ok.
// However, in this case one of the transaction queries failed,
// so we need to rollback the transaction.
return pgdb.ErrAlreadyExist
case errors.Is(pgdb.ErrInvalidDatabaseName, err):
msg := fmt.Sprintf("Invalid namespace: %s.%s", db, collection)
return common.NewErrorMsg(common.ErrInvalidNamespace, msg)
default:
return lazyerrors.Error(err)
}
return lazyerrors.Error(err)
}
return nil
})
if err != nil && !errors.Is(err, pgdb.ErrAlreadyExist) {
return nil, err
}

err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
if err := pgdb.CreateCollection(ctx, tx, db, collection); err != nil {
if errors.Is(err, pgdb.ErrAlreadyExist) {
switch {
case errors.Is(err, pgdb.ErrAlreadyExist):
msg := fmt.Sprintf("Collection %s.%s already exists.", db, collection)
return common.NewErrorMsg(common.ErrNamespaceExists, msg)
}
if errors.Is(err, pgdb.ErrInvalidTableName) {
case errors.Is(err, pgdb.ErrInvalidTableName):
msg := fmt.Sprintf("Invalid collection name: '%s.%s'", db, collection)
return common.NewErrorMsg(common.ErrInvalidNamespace, msg)
default:
return lazyerrors.Error(err)
}
return lazyerrors.Error(err)
}
return nil
})
Expand Down
4 changes: 4 additions & 0 deletions internal/handlers/pg/pgdb/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ func getTableName(ctx context.Context, tx pgx.Tx, db, collection string) (string

// getSettingsTable returns FerretDB settings table.
func getSettingsTable(ctx context.Context, tx pgx.Tx, db string) (*types.Document, error) {
// TODO https://github.com/FerretDB/FerretDB/issues/1206
// `SELECT settings FROM %s FOR UPDATE` could solve the problem with parallel access of the settings table,
// but it locks the row that could be accessed from other places and causes timeouts,
// so we need a faster solution instead.
sql := fmt.Sprintf(`SELECT settings FROM %s`, pgx.Identifier{db, settingsTableName}.Sanitize())
rows, err := tx.Query(ctx, sql)
if err != nil {
Expand Down
38 changes: 27 additions & 11 deletions internal/handlers/tigris/tigrisdb/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package tigrisdb

import (
"context"
"errors"
"time"

"github.com/tigrisdata/tigris-client-go/driver"

Expand All @@ -37,19 +39,33 @@ func (tdb *TigrisDB) createDatabaseIfNotExists(ctx context.Context, db string) (

// Database does not exist. Try to create it,
// but keep in mind that it can be created in concurrent connection.
err = tdb.Driver.CreateDatabase(ctx, db)
switch err := err.(type) {
case nil:
return true, nil
case *driver.Error:
if IsAlreadyExists(err) {
return false, nil
}
// If we detect that other creation is in flight, we give up to three attempts to create the database.
// TODO https://github.com/FerretDB/FerretDB/issues/1341
for i := 0; i < 3; i++ {
err = tdb.Driver.CreateDatabase(ctx, db)

return false, lazyerrors.Error(err)
default:
return false, lazyerrors.Error(err)
var driverErr *driver.Error

switch {
case err == nil:
return true, nil
case errors.As(err, &driverErr):
if IsAlreadyExists(err) {
return false, nil
}

if isOtherCreationInFlight(err) {
time.Sleep(20 * time.Millisecond)
continue
}

return false, lazyerrors.Error(err)
default:
return false, lazyerrors.Error(err)
}
}

return false, lazyerrors.Error(err)
}

// databaseExists returns true if database exists.
Expand Down
20 changes: 20 additions & 0 deletions internal/handlers/tigris/tigrisdb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package tigrisdb

import (
"errors"
"strings"

"github.com/AlekSi/pointer"
api "github.com/tigrisdata/tigris-client-go/api/server/v1"
"github.com/tigrisdata/tigris-client-go/driver"
Expand All @@ -34,6 +37,23 @@ func IsAlreadyExists(err error) bool {
return pointer.Get(e).Code == api.Code_ALREADY_EXISTS //nolint:nosnakecase // Tigris named their const that way
}

// isOtherCreationInFlight returns true if an attempt to create the database with the given name is already in progress.
// This function is implemented to keep nolint in a single place.
// TODO https://github.com/FerretDB/FerretDB/issues/1341
func isOtherCreationInFlight(err error) bool {
var driverErr *driver.Error
if !errors.As(err, &driverErr) {
panic("isOtherCreationInFlight called with non-driver error")
}

isUnknnown := pointer.Get(driverErr).Code == api.Code_UNKNOWN //nolint:nosnakecase // Tigris named their const that way
if !isUnknnown {
return false
}

return strings.Contains(driverErr.Message, "duplicate key value, violates key constraint")
}

// IsInvalidArgument returns true if the error is "invalid argument" error.
// This function is implemented to keep nolint in a single place.
func IsInvalidArgument(err error) bool {
Expand Down

1 comment on commit f4974a5

@vercel
Copy link

@vercel vercel bot commented on f4974a5 Oct 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

ferret-db – ./

ferret-db-git-main-ferretdb.vercel.app
ferret-db-ferretdb.vercel.app
ferret-db.vercel.app

Please sign in to comment.