Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stress tests #3502

Merged
merged 2 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration/commands_diagnostic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,9 @@ func TestCommandWhatsMyURIConnection(t *testing.T) {
t.Run("SameClientStress", func(t *testing.T) {
t.Parallel()

ports := make(chan string, teststress.NumGoroutines)
ports := make(chan string, 10)

teststress.Stress(t, func(ready chan<- struct{}, start <-chan struct{}) {
teststress.StressN(t, len(ports), func(ready chan<- struct{}, start <-chan struct{}) {
ready <- struct{}{}
<-start

Expand Down
4 changes: 2 additions & 2 deletions internal/backends/sqlite/metadata/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestCreateDropStress(t *testing.T) {

var i atomic.Int32

teststress.Stress(t, func(ready chan<- struct{}, start <-chan struct{}) {
n := teststress.Stress(t, func(ready chan<- struct{}, start <-chan struct{}) {
dbName := fmt.Sprintf("db_%03d", i.Add(1))

t.Cleanup(func() {
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestCreateDropStress(t *testing.T) {
require.Nil(t, db)
})

require.Equal(t, int32(teststress.NumGoroutines), i.Load())
require.Equal(t, int32(n), i.Load())
})
}
}
Expand Down
22 changes: 11 additions & 11 deletions internal/backends/sqlite/metadata/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestCreateDropStress(t *testing.T) {
// Otherwise, the test might fail with "database schema has changed".
// That error code is SQLITE_SCHEMA (17).
// See https://www.sqlite.org/rescode.html#schema and https://www.sqlite.org/compile.html#max_schema_retry
require.Less(t, teststress.NumGoroutines, 50)
const n = 50

ctx := testutil.Ctx(t)

Expand Down Expand Up @@ -132,7 +132,7 @@ func TestCreateDropStress(t *testing.T) {

var i atomic.Int32

teststress.Stress(t, func(ready chan<- struct{}, start <-chan struct{}) {
teststress.StressN(t, n, func(ready chan<- struct{}, start <-chan struct{}) {
collectionName := fmt.Sprintf("collection_%03d", i.Add(1))

ready <- struct{}{}
Expand All @@ -141,7 +141,7 @@ func TestCreateDropStress(t *testing.T) {
testCollection(t, ctx, r, db, dbName, collectionName)
})

require.Equal(t, int32(teststress.NumGoroutines), i.Load())
require.Equal(t, int32(n), i.Load())
})
}
}
Expand All @@ -150,7 +150,7 @@ func TestCreateSameStress(t *testing.T) {
// Otherwise, the test might fail with "database schema has changed".
// That error code is SQLITE_SCHEMA (17).
// See https://www.sqlite.org/rescode.html#schema and https://www.sqlite.org/compile.html#max_schema_retry
require.Less(t, teststress.NumGoroutines, 50)
const n = 50

ctx := testutil.Ctx(t)

Expand Down Expand Up @@ -183,7 +183,7 @@ func TestCreateSameStress(t *testing.T) {

var i, createdTotal atomic.Int32

teststress.Stress(t, func(ready chan<- struct{}, start <-chan struct{}) {
teststress.StressN(t, n, func(ready chan<- struct{}, start <-chan struct{}) {
id := i.Add(1)

ready <- struct{}{}
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestCreateSameStress(t *testing.T) {
require.NoError(t, err)
})

require.Equal(t, int32(teststress.NumGoroutines), i.Load())
require.Equal(t, int32(n), i.Load())
require.Equal(t, int32(1), createdTotal.Load())
})
}
Expand All @@ -223,7 +223,7 @@ func TestDropSameStress(t *testing.T) {
// Otherwise, the test might fail with "database schema has changed".
// That error code is SQLITE_SCHEMA (17).
// See https://www.sqlite.org/rescode.html#schema and https://www.sqlite.org/compile.html#max_schema_retry
require.Less(t, teststress.NumGoroutines, 50)
const n = 50

ctx := testutil.Ctx(t)

Expand Down Expand Up @@ -260,7 +260,7 @@ func TestDropSameStress(t *testing.T) {

var droppedTotal atomic.Int32

teststress.Stress(t, func(ready chan<- struct{}, start <-chan struct{}) {
teststress.StressN(t, n, func(ready chan<- struct{}, start <-chan struct{}) {
ready <- struct{}{}
<-start

Expand All @@ -280,7 +280,7 @@ func TestCreateDropSameStress(t *testing.T) {
// Otherwise, the test might fail with "database schema has changed".
// That error code is SQLITE_SCHEMA (17).
// See https://www.sqlite.org/rescode.html#schema and https://www.sqlite.org/compile.html#max_schema_retry
require.Less(t, teststress.NumGoroutines, 50)
const n = 50

ctx := testutil.Ctx(t)

Expand Down Expand Up @@ -313,7 +313,7 @@ func TestCreateDropSameStress(t *testing.T) {

var i, createdTotal, droppedTotal atomic.Int32

teststress.Stress(t, func(ready chan<- struct{}, start <-chan struct{}) {
teststress.StressN(t, n, func(ready chan<- struct{}, start <-chan struct{}) {
id := i.Add(1)

ready <- struct{}{}
Expand All @@ -334,7 +334,7 @@ func TestCreateDropSameStress(t *testing.T) {
}
})

require.Equal(t, int32(teststress.NumGoroutines), i.Load())
require.Equal(t, int32(n), i.Load())
require.Less(t, int32(1), createdTotal.Load())
require.Less(t, int32(1), droppedTotal.Load())
})
Expand Down
24 changes: 17 additions & 7 deletions internal/util/testutil/teststress/stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,36 @@ import (
"github.com/FerretDB/FerretDB/internal/util/testutil/testtb"
)

// NumGoroutines is the total count of goroutines created in Stress function.
var NumGoroutines = runtime.GOMAXPROCS(-1) * 10

// Stress runs function f in multiple goroutines.
// It returns the number of created goroutines.
//
// Function f should do a needed setup, send a message to ready channel when it is ready to start,
// wait for start channel to be closed, and then do the actual work.
func Stress(tb testtb.TB, f func(ready chan<- struct{}, start <-chan struct{})) {
func Stress(tb testtb.TB, f func(ready chan<- struct{}, start <-chan struct{})) int {
tb.Helper()

n := runtime.GOMAXPROCS(-1) * 10
StressN(tb, n, f)

return n
}

// StressN is a variant of Stress that allows to specify the number of goroutines.
//
// Prefer using Stress when possible.
func StressN(tb testtb.TB, n int, f func(ready chan<- struct{}, start <-chan struct{})) {
tb.Helper()

// do a bit more work to reduce a chance that one goroutine would finish
// before the other one is still being created
var wg sync.WaitGroup
readyCh := make(chan struct{}, NumGoroutines)
readyCh := make(chan struct{}, n)
startCh := make(chan struct{})

ctx, cancel := context.WithCancel(context.Background())
tb.Cleanup(cancel)

for i := 0; i < NumGoroutines; i++ {
for i := 0; i < n; i++ {
wg.Add(1)

go func() {
Expand All @@ -65,7 +75,7 @@ func Stress(tb testtb.TB, f func(ready chan<- struct{}, start <-chan struct{}))
}()
}

for i := 0; i < NumGoroutines; i++ {
for i := 0; i < n; i++ {
select {
case _, ok := <-readyCh:
if !ok {
Expand Down
Loading