Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-2201: Added LeakyBucket support to the rest tester under integration tests #5679

Merged
merged 10 commits into from Aug 25, 2022
5 changes: 5 additions & 0 deletions base/main_test_bucket_pool.go
Expand Up @@ -400,11 +400,16 @@ func (tbp *TestBucketPool) printStats() {
tbp.Logf(ctx, "==========================")

tbp.unclosedBucketsLock.Lock()
unclosedBucketWarnings := ""
for testName, buckets := range tbp.unclosedBuckets {
for bucketName := range buckets {
tbp.Logf(ctx, "WARNING: %s left %s bucket unclosed!", testName, bucketName)
unclosedBucketWarnings += fmt.Sprintf("%s left %s bucket unclosed!\n", testName, bucketName)
}
}
if unclosedBucketWarnings != "" {
panic(unclosedBucketWarnings)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall the background for this change - is this to better catch this scenario pre-merge?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, this was put in when a Leaky TestBucket was to be passed in to the RT in-order to make sure the bucket was closed manually at the end of the test and can't be missed. I left this change in (even though a TB is no longer passed in) because it would be useful in general as it is quite easy to miss a bucket being left unclosed when not going through all the test logs.

}
tbp.unclosedBucketsLock.Unlock()

tbp.verbose.Set(origVerbose)
Expand Down
18 changes: 14 additions & 4 deletions db/database.go
Expand Up @@ -259,15 +259,17 @@ func connectToBucketErrorHandling(spec base.BucketSpec, gotErr error) (fatalErro
return false, nil
}

// ConnectToBucketFailFast opens a Couchbase connect and return a specific bucket without retrying on failure.
func ConnectToBucketFailFast(spec base.BucketSpec) (bucket base.Bucket, err error) {
type OpenBucketFn func(spec base.BucketSpec) (base.Bucket, error)

// connectToBucketFailFast opens a Couchbase connect and return a specific bucket without retrying on failure.
func connectToBucketFailFast(spec base.BucketSpec) (bucket base.Bucket, err error) {
bucket, err = base.GetBucket(spec)
_, err = connectToBucketErrorHandling(spec, err)
return bucket, err
}

// ConnectToBucket opens a Couchbase connection and return a specific bucket.
func ConnectToBucket(spec base.BucketSpec) (base.Bucket, error) {
// connectToBucket opens a Couchbase connection and return a specific bucket.
func connectToBucket(spec base.BucketSpec) (base.Bucket, error) {

// start a retry loop to connect to the bucket backing off double the delay each time
worker := func() (bool, error, interface{}) {
Expand All @@ -294,6 +296,14 @@ func ConnectToBucket(spec base.BucketSpec) (base.Bucket, error) {
return ibucket.(base.Bucket), nil
}

// GetConnectToBucketFn returns a different OpenBucketFn to connect to the bucket depending on the value of failFast
func GetConnectToBucketFn(failFast bool) OpenBucketFn {
if failFast {
return connectToBucketFailFast
}
return connectToBucket
}

// Function type for something that calls NewDatabaseContext and wants a callback when the DB is detected
// to come back online. A rest.ServerContext package cannot be passed since it would introduce a circular dependency
type DBOnlineCallback func(dbContext *DatabaseContext)
Expand Down
4 changes: 2 additions & 2 deletions db/database_test.go
Expand Up @@ -1718,7 +1718,7 @@ func BenchmarkDatabase(b *testing.B) {
base.DisableTestLogging(b)

for i := 0; i < b.N; i++ {
bucket, _ := ConnectToBucket(base.BucketSpec{
bucket, _ := connectToBucket(base.BucketSpec{
Server: base.UnitTestUrl(),
CouchbaseDriver: base.ChooseCouchbaseDriver(base.DataBucket),
BucketName: fmt.Sprintf("b-%d", i)})
Expand All @@ -1735,7 +1735,7 @@ func BenchmarkDatabase(b *testing.B) {
func BenchmarkPut(b *testing.B) {
base.DisableTestLogging(b)

bucket, _ := ConnectToBucket(base.BucketSpec{
bucket, _ := connectToBucket(base.BucketSpec{
Server: base.UnitTestUrl(),
CouchbaseDriver: base.ChooseCouchbaseDriver(base.DataBucket),
BucketName: "Bucket"})
Expand Down
2 changes: 1 addition & 1 deletion rest/api.go
Expand Up @@ -215,7 +215,7 @@ func (h *handler) handleFlush() error {
}

// Manually re-open a temporary bucket connection just for flushing purposes
tempBucketForFlush, err := db.ConnectToBucket(spec)
tempBucketForFlush, err := db.GetConnectToBucketFn(false)(spec)
if err != nil {
return err
}
Expand Down
29 changes: 13 additions & 16 deletions rest/blip_api_collections_test.go
Expand Up @@ -13,10 +13,13 @@ import (
)

func TestBlipGetCollections(t *testing.T) {
// FIXME as part of CBG-2203 to enable subtest checkpointExistsWithErrorInNonDefaultCollection
base.TestRequiresCollections(t)

base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)

//checkpointIDWithError := "checkpointError"

rt := NewRestTester(t, &RestTesterConfig{
guestEnabled: true,
DatabaseConfig: &DatabaseConfig{
Expand All @@ -31,21 +34,16 @@ func TestBlipGetCollections(t *testing.T) {
},
},
createScopesAndCollections: true,
// This code will not work until leaky bucket works with collections CBG-2201
// TestBucket: base.GetTestBucket(t).LeakyBucketClone(base.LeakyBucketConfig{}),
//leakyBucketConfig: &base.LeakyBucketConfig{
// GetRawCallback: func(key string) error {
// if key == db.CheckpointDocIDPrefix+checkpointIDWithError {
// return fmt.Errorf("a unique error")
// }
// return nil
// },
//},
})

defer rt.Close()
// This code will not work until leaky bucket works with collections CBG-2201
// checkpointIDWithError := "checkpointError"
// leakyBucket, ok := base.AsLeakyBucket(rt.Bucket())
// require.True(t, ok)
// leakyBucket.SetGetRawCallback(func(key string) error {
// if key == db.CheckpointDocIDPrefix+checkpointIDWithError {
// return fmt.Errorf("a unique error")
// }
// return nil
// })

btc, err := NewBlipTesterClientOptsWithRT(t, rt, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -119,16 +117,15 @@ func TestBlipGetCollections(t *testing.T) {
resultBody: []db.Body{db.Body{}},
errorCode: "",
},
// This code will not work until leaky bucket works with collections CBG-2201
// {
//{
// name: "checkpointExistsWithErrorInNonDefaultCollection",
// requestBody: db.GetCollectionsRequestBody{
// CheckpointIDs: []string{checkpointIDWithError},
// Collections: []string{"fooScope.fooCollection"},
// },
// resultBody: []db.Body{nil},
// errorCode: "",
// },
//},
}

for _, testCase := range testCases {
Expand Down
23 changes: 10 additions & 13 deletions rest/server_context.go
Expand Up @@ -72,7 +72,7 @@ type bootstrapContext struct {
func (sc *ServerContext) CreateLocalDatabase(dbs DbConfigMap) error {
for _, dbConfig := range dbs {
dbc := dbConfig.ToDatabaseConfig()
_, err := sc._getOrAddDatabaseFromConfig(*dbc, false, false)
_, err := sc._getOrAddDatabaseFromConfig(*dbc, false, db.GetConnectToBucketFn(false))
if err != nil {
return err
}
Expand Down Expand Up @@ -297,7 +297,7 @@ func (sc *ServerContext) PostUpgrade(preview bool) (postUpgradeResults PostUpgra
func (sc *ServerContext) _reloadDatabase(reloadDbName string, failFast bool) (*db.DatabaseContext, error) {
sc._unloadDatabase(reloadDbName)
config := sc.dbConfigs[reloadDbName]
return sc._getOrAddDatabaseFromConfig(*config, true, failFast)
return sc._getOrAddDatabaseFromConfig(*config, true, db.GetConnectToBucketFn(failFast))
}

// Removes and re-adds a database to the ServerContext.
Expand All @@ -318,18 +318,18 @@ func (sc *ServerContext) ReloadDatabaseWithConfig(config DatabaseConfig) error {

func (sc *ServerContext) _reloadDatabaseWithConfig(config DatabaseConfig, failFast bool) error {
sc._removeDatabase(config.Name)
_, err := sc._getOrAddDatabaseFromConfig(config, false, failFast)
_, err := sc._getOrAddDatabaseFromConfig(config, false, db.GetConnectToBucketFn(failFast))
return err
}

// Adds a database to the ServerContext. Attempts a read after it gets the write
// lock to see if it's already been added by another process. If so, returns either the
// existing DatabaseContext or an error based on the useExisting flag.
func (sc *ServerContext) getOrAddDatabaseFromConfig(config DatabaseConfig, useExisting bool, failFast bool) (*db.DatabaseContext, error) {
func (sc *ServerContext) getOrAddDatabaseFromConfig(config DatabaseConfig, useExisting bool, openBucketFn db.OpenBucketFn) (*db.DatabaseContext, error) {
// Obtain write lock during add database, to avoid race condition when creating based on ConfigServer
sc.lock.Lock()
defer sc.lock.Unlock()
return sc._getOrAddDatabaseFromConfig(config, useExisting, failFast)
return sc._getOrAddDatabaseFromConfig(config, useExisting, openBucketFn)
}

func GetBucketSpec(config *DatabaseConfig, serverConfig *StartupConfig) (spec base.BucketSpec, err error) {
Expand Down Expand Up @@ -367,7 +367,8 @@ func GetBucketSpec(config *DatabaseConfig, serverConfig *StartupConfig) (spec ba
// Adds a database to the ServerContext. Attempts a read after it gets the write
// lock to see if it's already been added by another process. If so, returns either the
// existing DatabaseContext or an error based on the useExisting flag.
func (sc *ServerContext) _getOrAddDatabaseFromConfig(config DatabaseConfig, useExisting, failFast bool) (*db.DatabaseContext, error) {
// Pass in a bucketFromBucketSpecFn to replace the default ConnectToBucket function. This will cause the failFast argument to be ignored
func (sc *ServerContext) _getOrAddDatabaseFromConfig(config DatabaseConfig, useExisting bool, openBucketFn db.OpenBucketFn) (*db.DatabaseContext, error) {

// Generate bucket spec and validate whether db already exists
spec, err := GetBucketSpec(&config, sc.config)
Expand Down Expand Up @@ -399,11 +400,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(config DatabaseConfig, useE
// Connect to bucket
base.InfofCtx(context.TODO(), base.KeyAll, "Opening db /%s as bucket %q, pool %q, server <%s>",
base.MD(dbName), base.MD(spec.BucketName), base.SD(base.DefaultPool), base.SD(spec.Server))
connectToBucketFn := db.ConnectToBucket
if failFast {
connectToBucketFn = db.ConnectToBucketFailFast
}
bucket, err := connectToBucketFn(spec)
bucket, err := openBucketFn(spec)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -949,13 +946,13 @@ func (sc *ServerContext) initEventHandlers(dbcontext *db.DatabaseContext, config
// Adds a database to the ServerContext given its configuration. If an existing config is found
// for the name, returns an error.
func (sc *ServerContext) AddDatabaseFromConfig(config DatabaseConfig) (*db.DatabaseContext, error) {
return sc.getOrAddDatabaseFromConfig(config, false, false)
return sc.getOrAddDatabaseFromConfig(config, false, db.GetConnectToBucketFn(false))
}

// AddDatabaseFromConfigFailFast adds a database to the ServerContext given its configuration and fails fast.
// If an existing config is found for the name, returns an error.
func (sc *ServerContext) AddDatabaseFromConfigFailFast(config DatabaseConfig) (*db.DatabaseContext, error) {
return sc.getOrAddDatabaseFromConfig(config, false, true)
return sc.getOrAddDatabaseFromConfig(config, false, db.GetConnectToBucketFn(true))
}

func (sc *ServerContext) processEventHandlersForEvent(events []*EventConfig, eventType db.EventType, dbcontext *db.DatabaseContext) error {
Expand Down
10 changes: 6 additions & 4 deletions rest/server_context_test.go
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/couchbase/sync_gateway/db"

"github.com/couchbase/gocbcore/v10/connstr"
"github.com/couchbase/sync_gateway/base"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -162,7 +164,7 @@ func TestGetOrAddDatabaseFromConfig(t *testing.T) {

// Get or add database name from config without valid database name; throws 400 Illegal database name error
dbConfig := DbConfig{OldRevExpirySeconds: &oldRevExpirySeconds, LocalDocExpirySecs: &localDocExpirySecs}
dbContext, err := serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, false)
dbContext, err := serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, db.GetConnectToBucketFn(false))
assert.Nil(t, dbContext, "Can't create database context without a valid database name")
assert.Error(t, err, "It should throw 400 Illegal database name")
assert.Contains(t, err.Error(), strconv.Itoa(http.StatusBadRequest))
Expand All @@ -181,7 +183,7 @@ func TestGetOrAddDatabaseFromConfig(t *testing.T) {
BucketConfig: BucketConfig{Server: &server, Bucket: &bucketName},
}

dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, false)
dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, db.GetConnectToBucketFn(false))
assert.Nil(t, dbContext, "Can't create database context from config with unrecognized value for import_docs")
assert.Error(t, err, "It should throw Unrecognized value for import_docs")

Expand All @@ -208,14 +210,14 @@ func TestGetOrAddDatabaseFromConfig(t *testing.T) {
AutoImport: false,
}

dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, false)
dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, db.GetConnectToBucketFn(false))
assert.Nil(t, dbContext, "Can't create database context with duplicate database name")
assert.Error(t, err, "It should throw 412 Duplicate database names")
assert.Contains(t, err.Error(), strconv.Itoa(http.StatusPreconditionFailed))

// Get or add database from config with duplicate database name and useExisting as true
// Existing database context should be returned
dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, true, false)
dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, true, db.GetConnectToBucketFn(false))
assert.NoError(t, err, "No error while trying to get the existing database name")
assert.Equal(t, server, dbContext.BucketSpec.Server)
assert.Equal(t, bucketName, dbContext.BucketSpec.BucketName)
Expand Down
78 changes: 67 additions & 11 deletions rest/utilities_testing.go
Expand Up @@ -43,14 +43,15 @@ import (

// RestTesterConfig represents configuration for sync gateway
type RestTesterConfig struct {
guestEnabled bool // If this is true, Admin Party is in full effect
SyncFn string // put the sync() function source in here (optional)
DatabaseConfig *DatabaseConfig // Supports additional config options. BucketConfig, Name, Sync, Unsupported will be ignored (overridden)
InitSyncSeq uint64 // If specified, initializes _sync:seq on bucket creation. Not supported when running against walrus
EnableNoConflictsMode bool // Enable no-conflicts mode. By default, conflicts will be allowed, which is the default behavior
TestBucket *base.TestBucket // If set, use this bucket instead of requesting a new one.
adminInterface string // adminInterface overrides the default admin interface.
sgReplicateEnabled bool // sgReplicateManager disabled by default for RestTester
guestEnabled bool // If this is true, Admin Party is in full effect
SyncFn string // put the sync() function source in here (optional)
DatabaseConfig *DatabaseConfig // Supports additional config options. BucketConfig, Name, Sync, Unsupported will be ignored (overridden)
InitSyncSeq uint64 // If specified, initializes _sync:seq on bucket creation. Not supported when running against walrus
EnableNoConflictsMode bool // Enable no-conflicts mode. By default, conflicts will be allowed, which is the default behavior
TestBucket *base.TestBucket // If set, use this bucket instead of requesting a new one.
leakyBucketConfig *base.LeakyBucketConfig // Set to create and use a leaky bucket on the RT and DB. A test bucket cannot be passed in if using this option.
adminInterface string // adminInterface overrides the default admin interface.
sgReplicateEnabled bool // sgReplicateManager disabled by default for RestTester
hideProductInfo bool
adminInterfaceAuthentication bool
metricsInterfaceAuthentication bool
Expand Down Expand Up @@ -92,7 +93,6 @@ func NewRestTester(tb testing.TB, restConfig *RestTesterConfig) *RestTester {
}

func (rt *RestTester) Bucket() base.Bucket {

if rt.tb == nil {
panic("RestTester not properly initialized please use NewRestTester function")
} else if rt.closed {
Expand All @@ -107,6 +107,14 @@ func (rt *RestTester) Bucket() base.Bucket {
testBucket := rt.RestTesterConfig.TestBucket
if testBucket == nil {
testBucket = base.GetTestBucket(rt.tb)
if rt.leakyBucketConfig != nil {
leakyConfig := *rt.leakyBucketConfig
// Ignore closures to avoid double closing panics
leakyConfig.IgnoreClose = true
testBucket = testBucket.LeakyBucketClone(leakyConfig)
}
} else if rt.leakyBucketConfig != nil {
rt.tb.Fatalf("A passed in TestBucket cannot be used on the RestTester when defining a leakyBucketConfig")
}
rt.testBucket = testBucket

Expand Down Expand Up @@ -232,7 +240,34 @@ func (rt *RestTester) Bucket() base.Bucket {

rt.DatabaseConfig.SGReplicateEnabled = base.BoolPtr(rt.RestTesterConfig.sgReplicateEnabled)

_, err = rt.RestTesterServerContext.AddDatabaseFromConfig(*rt.DatabaseConfig)
if rt.leakyBucketConfig != nil {
// Scopes and collections have to be set on the bucket being passed in for the db to use.
// WIP: Collections Phase 1 - Grab just one scope/collection from the defined set.
// Phase 2 (multi collection) means DatabaseContext needs a set of BucketSpec/Collections, not just one...
var scope, collection *string
for scopeName, scopeConfig := range rt.RestTesterConfig.DatabaseConfig.Scopes {
scope = &scopeName
for collectionName := range scopeConfig.Collections {
collection = &collectionName
break
}
}
if scope != nil && collection != nil {
collectionBucket, err := base.AsCollection(testBucket.Bucket)
if err != nil {
rt.tb.Fatalf("Could not get collection from bucket with type %T: %v", testBucket.Bucket, err)
}

collectionBucket.Spec.Scope = scope
collectionBucket.Spec.Collection = collection
collectionBucket.Collection = collectionBucket.Collection.Bucket().Scope(*scope).Collection(*collection)
}

_, err = rt.RestTesterServerContext.AddDatabaseFromConfigWithBucket(rt.tb, *rt.DatabaseConfig, testBucket.Bucket)
IsaacLambat marked this conversation as resolved.
Show resolved Hide resolved
} else {
_, err = rt.RestTesterServerContext.AddDatabaseFromConfig(*rt.DatabaseConfig)
}

if err != nil {
rt.tb.Fatalf("Error from AddDatabaseFromConfig: %v", err)
}
Expand All @@ -251,10 +286,23 @@ func (rt *RestTester) Bucket() base.Bucket {

// PostStartup (without actually waiting 5 seconds)
close(rt.RestTesterServerContext.hasStarted)

return rt.testBucket.Bucket
}

// LeakyBucket gets the bucket from the RestTester as a leaky bucket allowing for callbacks to be set on the fly.
// The RestTester must have been set up to create and use a leaky bucket by setting leakyBucketConfig in the RT
// config when calling NewRestTester.
func (rt *RestTester) LeakyBucket() *base.LeakyBucket {
if rt.leakyBucketConfig == nil {
rt.tb.Fatalf("Cannot get leaky bucket when leakyBucketConfig was not set on RestTester initialisation")
}
leakyBucket, ok := base.AsLeakyBucket(rt.Bucket())
if !ok {
rt.tb.Fatalf("Could not get bucket (type %T) as a leaky bucket", rt.Bucket())
}
return leakyBucket
}

func (rt *RestTester) ServerContext() *ServerContext {
rt.Bucket()
return rt.RestTesterServerContext
Expand Down Expand Up @@ -852,6 +900,14 @@ func (s *SlowResponseRecorder) Write(buf []byte) (int, error) {
return numBytesWritten, err
}

// AddDatabaseFromConfigWithBucket adds a database to the ServerContext and sets a specific bucket on the database context.
// If an existing config is found for the name, returns an error.
func (sc *ServerContext) AddDatabaseFromConfigWithBucket(tb testing.TB, config DatabaseConfig, bucket base.Bucket) (*db.DatabaseContext, error) {
return sc.getOrAddDatabaseFromConfig(config, false, func(spec base.BucketSpec) (base.Bucket, error) {
return bucket, nil
})
}

// The parameters used to create a BlipTester
type BlipTesterSpec struct {

Expand Down