Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stress tests for settings table and fix simple issues with transactions #1316

Merged
merged 40 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
82a45b2
Lock settings table
AlekSi Oct 19, 2022
4755005
Merge branch 'main' into lock-table
AlekSi Oct 19, 2022
4670e24
Merge branch 'main' into lock-table
AlekSi Oct 23, 2022
73df1d5
Merge branch 'main' into lock-table
rumyantseva Oct 24, 2022
92059f7
parallel tests to demo problems
rumyantseva Oct 24, 2022
800b270
wip
rumyantseva Oct 24, 2022
5505f2b
wip
rumyantseva Oct 24, 2022
83191c1
wip
rumyantseva Oct 24, 2022
a4411e9
wip
rumyantseva Oct 24, 2022
66f764b
wip
rumyantseva Oct 25, 2022
db1a855
rollback stats - in a separate PR
rumyantseva Oct 25, 2022
e38f107
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 25, 2022
36adb81
wip
rumyantseva Oct 25, 2022
441794c
Tigris things
rumyantseva Oct 25, 2022
ce526b0
Apply suggestions from code review
rumyantseva Oct 25, 2022
a3db999
wip
rumyantseva Oct 25, 2022
1cca2f8
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 25, 2022
a09782c
Merge branch 'issue-1282-investigate-tables' of https://github.com/ru…
rumyantseva Oct 25, 2022
185ee71
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 25, 2022
befd734
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 25, 2022
e269086
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 25, 2022
8e0b6b8
tigris things
rumyantseva Oct 26, 2022
9b31a70
Merge branch 'issue-1282-investigate-tables' of https://github.com/ru…
rumyantseva Oct 26, 2022
e7348d3
tigris things
rumyantseva Oct 26, 2022
85fe3bd
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 26, 2022
03bc222
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 26, 2022
38aa9de
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 26, 2022
bc8a7d3
Merge branch 'main' into issue-1282-investigate-tables
Oct 27, 2022
d65d58b
gomaxprocs
rumyantseva Oct 27, 2022
cf994f0
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 27, 2022
807d546
wip
rumyantseva Oct 28, 2022
441b36e
wip
rumyantseva Oct 28, 2022
81095c3
todos
rumyantseva Oct 28, 2022
a3c1deb
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 28, 2022
6c65dfe
wording
rumyantseva Oct 28, 2022
fba8f08
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 28, 2022
155a8f9
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 28, 2022
5a7a766
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 31, 2022
0b0c662
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 31, 2022
42fa253
Merge branch 'main' into issue-1282-investigate-tables
mergify[bot] Oct 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions integration/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
package integration

import (
"errors"
"fmt"
"runtime"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
Expand All @@ -26,6 +31,101 @@ import (
"github.com/FerretDB/FerretDB/integration/setup"
)

func TestCreateStress(t *testing.T) {
setup.SkipForPostgresWithReason(t, "https://github.com/FerretDB/FerretDB/issues/1206")

t.Parallel()

ctx, collection := setup.Setup(t) // no providers there, we will create collections concurrently
db := collection.Database()

collNum := runtime.GOMAXPROCS(-1) * 10

ready := make(chan struct{}, collNum)
start := make(chan struct{})

var wg sync.WaitGroup
for i := 0; i < collNum; i++ {
wg.Add(1)

go func(i int) {
defer wg.Done()

ready <- struct{}{}

<-start

collName := fmt.Sprintf("stress_%d", i)

schema := fmt.Sprintf(`{
"title": "%s",
"description": "Create Collection Stress %d",
"primary_key": ["_id"],
"properties": {
"_id": {"type": "string"},
"v": {"type": "string"}
}
}`, collName, i,
)
opts := options.CreateCollectionOptions{
Validator: bson.D{{"$tigrisSchemaString", schema}},
}

// Attempt to create a collection for Tigris with a schema.
// If we get an error, that's MongoDB (FerretDB ignores that argument for non-Tigris handlers),
// so we create collection without schema.
err := db.CreateCollection(ctx, collName, &opts)
if err != nil {
AlekSi marked this conversation as resolved.
Show resolved Hide resolved
var cmdErr *mongo.CommandError
if errors.As(err, &cmdErr) {
if strings.Contains(cmdErr.Message, `unknown top level operator: $tigrisSchemaString`) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the same comment there and at #1318 (comment), but the implementation of error checking is very different. Which one is correct?

err = db.CreateCollection(ctx, collName)
}
}

assert.NoError(t, err)
}

_, err = db.Collection(collName).InsertOne(ctx, bson.D{{"_id", "foo"}, {"v", "bar"}})

assert.NoError(t, err)
}(i)
}

for i := 0; i < collNum; i++ {
<-ready
}

close(start)

wg.Wait()

colls, err := db.ListCollectionNames(ctx, bson.D{})
require.NoError(t, err)

// TODO https://github.com/FerretDB/FerretDB/issues/1206
// Without SkipForPostgres this test would fail.
// Even though all the collections are created as separate tables in the database,
// the settings table doesn't store all of them because of concurrency issues.
require.Len(t, colls, collNum)

// check that all collections were created, and we can query them
for i := 0; i < collNum; i++ {
i := i

t.Run(fmt.Sprintf("check_stress_%d", i), func(t *testing.T) {
AlekSi marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()

collName := fmt.Sprintf("stress_%d", i)
AlekSi marked this conversation as resolved.
Show resolved Hide resolved

var doc bson.D
err := db.Collection(collName).FindOne(ctx, bson.D{{"_id", "foo"}}).Decode(&doc)
require.NoError(t, err)
require.Equal(t, bson.D{{"_id", "foo"}, {"v", "bar"}}, doc)
})
}
}

func TestCreateTigris(t *testing.T) {
setup.SkipForPostgresWithReason(t, "Tigris-specific schema is used")

Expand Down
31 changes: 25 additions & 6 deletions internal/handlers/pg/msg_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,44 @@ func (h *Handler) MsgCreate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return nil, err
}

// We use two separate transactions as there is a case when a query of the first transaction could fail,
// and we should consider it normal: it could happen if we attempt to create databases from two parallel requests.
// One of such requests will fail as the database was already created from another request,
// but it's a normal situation, and we should create both collections in such a case.

err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
if err := pgdb.CreateDatabaseIfNotExists(ctx, tx, db); err != nil {
if errors.Is(pgdb.ErrInvalidDatabaseName, err) {
switch {
case errors.Is(err, pgdb.ErrAlreadyExist):
// If the DB was created from a parallel query, it's ok.
// However, in this case one of the transaction queries failed,
// so we need to rollback the transaction.
rumyantseva marked this conversation as resolved.
Show resolved Hide resolved
return pgdb.ErrAlreadyExist
case errors.Is(pgdb.ErrInvalidDatabaseName, err):
msg := fmt.Sprintf("Invalid namespace: %s.%s", db, collection)
return common.NewErrorMsg(common.ErrInvalidNamespace, msg)
default:
return lazyerrors.Error(err)
}
return lazyerrors.Error(err)
}
return nil
})
if err != nil && !errors.Is(err, pgdb.ErrAlreadyExist) {
return nil, err
}

err = h.PgPool.InTransaction(ctx, func(tx pgx.Tx) error {
if err := pgdb.CreateCollection(ctx, tx, db, collection); err != nil {
if errors.Is(err, pgdb.ErrAlreadyExist) {
switch {
case errors.Is(err, pgdb.ErrAlreadyExist):
msg := fmt.Sprintf("Collection %s.%s already exists.", db, collection)
return common.NewErrorMsg(common.ErrNamespaceExists, msg)
}
if errors.Is(err, pgdb.ErrInvalidTableName) {
case errors.Is(err, pgdb.ErrInvalidTableName):
msg := fmt.Sprintf("Invalid collection name: '%s.%s'", db, collection)
return common.NewErrorMsg(common.ErrInvalidNamespace, msg)
default:
return lazyerrors.Error(err)
}
return lazyerrors.Error(err)
}
return nil
})
Expand Down
4 changes: 4 additions & 0 deletions internal/handlers/pg/pgdb/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ func getTableName(ctx context.Context, tx pgx.Tx, db, collection string) (string

// getSettingsTable returns FerretDB settings table.
func getSettingsTable(ctx context.Context, tx pgx.Tx, db string) (*types.Document, error) {
// TODO https://github.com/FerretDB/FerretDB/issues/1206
// `SELECT settings FROM %s FOR UPDATE` could solve the problem with parallel access of the settings table,
// but it locks the row that could be accessed from other places and causes timeouts,
// so we need a faster solution instead.
sql := fmt.Sprintf(`SELECT settings FROM %s`, pgx.Identifier{db, settingsTableName}.Sanitize())
rows, err := tx.Query(ctx, sql)
if err != nil {
Expand Down
38 changes: 27 additions & 11 deletions internal/handlers/tigris/tigrisdb/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package tigrisdb

import (
"context"
"errors"
"time"

"github.com/tigrisdata/tigris-client-go/driver"

Expand All @@ -37,19 +39,33 @@ func (tdb *TigrisDB) createDatabaseIfNotExists(ctx context.Context, db string) (

// Database does not exist. Try to create it,
// but keep in mind that it can be created in concurrent connection.
err = tdb.Driver.CreateDatabase(ctx, db)
switch err := err.(type) {
case nil:
return true, nil
case *driver.Error:
if IsAlreadyExists(err) {
return false, nil
}
// If we detect that other creation is in flight, we give up to three attempts to create the database.
AlekSi marked this conversation as resolved.
Show resolved Hide resolved
// TODO https://github.com/FerretDB/FerretDB/issues/1341
for i := 0; i < 3; i++ {
err = tdb.Driver.CreateDatabase(ctx, db)

return false, lazyerrors.Error(err)
default:
return false, lazyerrors.Error(err)
var driverErr *driver.Error

switch {
case err == nil:
return true, nil
case errors.As(err, &driverErr):
if IsAlreadyExists(err) {
return false, nil
}

if isOtherCreationInFlight(err) {
time.Sleep(20 * time.Millisecond)
continue
}

return false, lazyerrors.Error(err)
default:
return false, lazyerrors.Error(err)
}
}

return false, lazyerrors.Error(err)
}

// databaseExists returns true if database exists.
Expand Down
20 changes: 20 additions & 0 deletions internal/handlers/tigris/tigrisdb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package tigrisdb

import (
"errors"
"strings"

"github.com/AlekSi/pointer"
api "github.com/tigrisdata/tigris-client-go/api/server/v1"
"github.com/tigrisdata/tigris-client-go/driver"
Expand All @@ -34,6 +37,23 @@ func IsAlreadyExists(err error) bool {
return pointer.Get(e).Code == api.Code_ALREADY_EXISTS //nolint:nosnakecase // Tigris named their const that way
}

// isOtherCreationInFlight returns true if an attempt to create the database with the given name is already in progress.
// This function is implemented to keep nolint in a single place.
// TODO https://github.com/FerretDB/FerretDB/issues/1341
func isOtherCreationInFlight(err error) bool {
var driverErr *driver.Error
if !errors.As(err, &driverErr) {
panic("isOtherCreationInFlight called with non-driver error")
}

isUnknnown := pointer.Get(driverErr).Code == api.Code_UNKNOWN //nolint:nosnakecase // Tigris named their const that way
if !isUnknnown {
return false
}

return strings.Contains(driverErr.Message, "duplicate key value, violates key constraint")
}

// IsInvalidArgument returns true if the error is "invalid argument" error.
// This function is implemented to keep nolint in a single place.
func IsInvalidArgument(err error) bool {
Expand Down