Skip to content

Commit

Permalink
CBG-2201: Added LeakyBucket support to the rest tester under integrat…
Browse files Browse the repository at this point in the history
…ion tests (#5679)

* CBG-2201: Added LeakyBucket support to the rest tester under integration tests

* Fixed normal buckets double closing causing panic

* Added RT option to use leaky bucket on DB

* Added panic is not using a leaky bucket£

* Addressed PR feedback, change add database function with connect fn to AddDatabaseFromConfigWithBucket

* Added collection support, and use TB on DB if provided in RT if provided

* Address PR feedback

* Rest tester makes leaky bucket when needed

* Modified TestBlipGetCollections to pick up new leaky bucket changes

* Removed createScopesAndCollections
  • Loading branch information
IsaacLambat authored and torcolvin committed Sep 1, 2022
1 parent 0c2faab commit 943dcf4
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 51 deletions.
5 changes: 5 additions & 0 deletions base/main_test_bucket_pool.go
Expand Up @@ -400,11 +400,16 @@ func (tbp *TestBucketPool) printStats() {
tbp.Logf(ctx, "==========================")

tbp.unclosedBucketsLock.Lock()
unclosedBucketWarnings := ""
for testName, buckets := range tbp.unclosedBuckets {
for bucketName := range buckets {
tbp.Logf(ctx, "WARNING: %s left %s bucket unclosed!", testName, bucketName)
unclosedBucketWarnings += fmt.Sprintf("%s left %s bucket unclosed!\n", testName, bucketName)
}
}
if unclosedBucketWarnings != "" {
panic(unclosedBucketWarnings)
}
tbp.unclosedBucketsLock.Unlock()

tbp.verbose.Set(origVerbose)
Expand Down
18 changes: 14 additions & 4 deletions db/database.go
Expand Up @@ -259,15 +259,17 @@ func connectToBucketErrorHandling(spec base.BucketSpec, gotErr error) (fatalErro
return false, nil
}

// ConnectToBucketFailFast opens a Couchbase connect and return a specific bucket without retrying on failure.
func ConnectToBucketFailFast(spec base.BucketSpec) (bucket base.Bucket, err error) {
type OpenBucketFn func(spec base.BucketSpec) (base.Bucket, error)

// connectToBucketFailFast opens a Couchbase connect and return a specific bucket without retrying on failure.
func connectToBucketFailFast(spec base.BucketSpec) (bucket base.Bucket, err error) {
bucket, err = base.GetBucket(spec)
_, err = connectToBucketErrorHandling(spec, err)
return bucket, err
}

// ConnectToBucket opens a Couchbase connection and return a specific bucket.
func ConnectToBucket(spec base.BucketSpec) (base.Bucket, error) {
// connectToBucket opens a Couchbase connection and return a specific bucket.
func connectToBucket(spec base.BucketSpec) (base.Bucket, error) {

// start a retry loop to connect to the bucket backing off double the delay each time
worker := func() (bool, error, interface{}) {
Expand All @@ -294,6 +296,14 @@ func ConnectToBucket(spec base.BucketSpec) (base.Bucket, error) {
return ibucket.(base.Bucket), nil
}

// GetConnectToBucketFn returns a different OpenBucketFn to connect to the bucket depending on the value of failFast
func GetConnectToBucketFn(failFast bool) OpenBucketFn {
if failFast {
return connectToBucketFailFast
}
return connectToBucket
}

// Function type for something that calls NewDatabaseContext and wants a callback when the DB is detected
// to come back online. A rest.ServerContext package cannot be passed since it would introduce a circular dependency
type DBOnlineCallback func(dbContext *DatabaseContext)
Expand Down
4 changes: 2 additions & 2 deletions db/database_test.go
Expand Up @@ -1718,7 +1718,7 @@ func BenchmarkDatabase(b *testing.B) {
base.DisableTestLogging(b)

for i := 0; i < b.N; i++ {
bucket, _ := ConnectToBucket(base.BucketSpec{
bucket, _ := connectToBucket(base.BucketSpec{
Server: base.UnitTestUrl(),
CouchbaseDriver: base.ChooseCouchbaseDriver(base.DataBucket),
BucketName: fmt.Sprintf("b-%d", i)})
Expand All @@ -1735,7 +1735,7 @@ func BenchmarkDatabase(b *testing.B) {
func BenchmarkPut(b *testing.B) {
base.DisableTestLogging(b)

bucket, _ := ConnectToBucket(base.BucketSpec{
bucket, _ := connectToBucket(base.BucketSpec{
Server: base.UnitTestUrl(),
CouchbaseDriver: base.ChooseCouchbaseDriver(base.DataBucket),
BucketName: "Bucket"})
Expand Down
2 changes: 1 addition & 1 deletion rest/api.go
Expand Up @@ -215,7 +215,7 @@ func (h *handler) handleFlush() error {
}

// Manually re-open a temporary bucket connection just for flushing purposes
tempBucketForFlush, err := db.ConnectToBucket(spec)
tempBucketForFlush, err := db.GetConnectToBucketFn(false)(spec)
if err != nil {
return err
}
Expand Down
29 changes: 13 additions & 16 deletions rest/blip_api_collections_test.go
Expand Up @@ -13,10 +13,13 @@ import (
)

func TestBlipGetCollections(t *testing.T) {
// FIXME as part of CBG-2203 to enable subtest checkpointExistsWithErrorInNonDefaultCollection
base.TestRequiresCollections(t)

base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)

//checkpointIDWithError := "checkpointError"

rt := NewRestTester(t, &RestTesterConfig{
guestEnabled: true,
DatabaseConfig: &DatabaseConfig{
Expand All @@ -31,21 +34,16 @@ func TestBlipGetCollections(t *testing.T) {
},
},
createScopesAndCollections: true,
// This code will not work until leaky bucket works with collections CBG-2201
// TestBucket: base.GetTestBucket(t).LeakyBucketClone(base.LeakyBucketConfig{}),
//leakyBucketConfig: &base.LeakyBucketConfig{
// GetRawCallback: func(key string) error {
// if key == db.CheckpointDocIDPrefix+checkpointIDWithError {
// return fmt.Errorf("a unique error")
// }
// return nil
// },
//},
})

defer rt.Close()
// This code will not work until leaky bucket works with collections CBG-2201
// checkpointIDWithError := "checkpointError"
// leakyBucket, ok := base.AsLeakyBucket(rt.Bucket())
// require.True(t, ok)
// leakyBucket.SetGetRawCallback(func(key string) error {
// if key == db.CheckpointDocIDPrefix+checkpointIDWithError {
// return fmt.Errorf("a unique error")
// }
// return nil
// })

btc, err := NewBlipTesterClientOptsWithRT(t, rt, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -119,16 +117,15 @@ func TestBlipGetCollections(t *testing.T) {
resultBody: []db.Body{db.Body{}},
errorCode: "",
},
// This code will not work until leaky bucket works with collections CBG-2201
// {
//{
// name: "checkpointExistsWithErrorInNonDefaultCollection",
// requestBody: db.GetCollectionsRequestBody{
// CheckpointIDs: []string{checkpointIDWithError},
// Collections: []string{"fooScope.fooCollection"},
// },
// resultBody: []db.Body{nil},
// errorCode: "",
// },
//},
}

for _, testCase := range testCases {
Expand Down
23 changes: 10 additions & 13 deletions rest/server_context.go
Expand Up @@ -72,7 +72,7 @@ type bootstrapContext struct {
func (sc *ServerContext) CreateLocalDatabase(dbs DbConfigMap) error {
for _, dbConfig := range dbs {
dbc := dbConfig.ToDatabaseConfig()
_, err := sc._getOrAddDatabaseFromConfig(*dbc, false, false)
_, err := sc._getOrAddDatabaseFromConfig(*dbc, false, db.GetConnectToBucketFn(false))
if err != nil {
return err
}
Expand Down Expand Up @@ -297,7 +297,7 @@ func (sc *ServerContext) PostUpgrade(preview bool) (postUpgradeResults PostUpgra
func (sc *ServerContext) _reloadDatabase(reloadDbName string, failFast bool) (*db.DatabaseContext, error) {
sc._unloadDatabase(reloadDbName)
config := sc.dbConfigs[reloadDbName]
return sc._getOrAddDatabaseFromConfig(*config, true, failFast)
return sc._getOrAddDatabaseFromConfig(*config, true, db.GetConnectToBucketFn(failFast))
}

// Removes and re-adds a database to the ServerContext.
Expand All @@ -318,18 +318,18 @@ func (sc *ServerContext) ReloadDatabaseWithConfig(config DatabaseConfig) error {

func (sc *ServerContext) _reloadDatabaseWithConfig(config DatabaseConfig, failFast bool) error {
sc._removeDatabase(config.Name)
_, err := sc._getOrAddDatabaseFromConfig(config, false, failFast)
_, err := sc._getOrAddDatabaseFromConfig(config, false, db.GetConnectToBucketFn(failFast))
return err
}

// Adds a database to the ServerContext. Attempts a read after it gets the write
// lock to see if it's already been added by another process. If so, returns either the
// existing DatabaseContext or an error based on the useExisting flag.
func (sc *ServerContext) getOrAddDatabaseFromConfig(config DatabaseConfig, useExisting bool, failFast bool) (*db.DatabaseContext, error) {
func (sc *ServerContext) getOrAddDatabaseFromConfig(config DatabaseConfig, useExisting bool, openBucketFn db.OpenBucketFn) (*db.DatabaseContext, error) {
// Obtain write lock during add database, to avoid race condition when creating based on ConfigServer
sc.lock.Lock()
defer sc.lock.Unlock()
return sc._getOrAddDatabaseFromConfig(config, useExisting, failFast)
return sc._getOrAddDatabaseFromConfig(config, useExisting, openBucketFn)
}

func GetBucketSpec(config *DatabaseConfig, serverConfig *StartupConfig) (spec base.BucketSpec, err error) {
Expand Down Expand Up @@ -367,7 +367,8 @@ func GetBucketSpec(config *DatabaseConfig, serverConfig *StartupConfig) (spec ba
// Adds a database to the ServerContext. Attempts a read after it gets the write
// lock to see if it's already been added by another process. If so, returns either the
// existing DatabaseContext or an error based on the useExisting flag.
func (sc *ServerContext) _getOrAddDatabaseFromConfig(config DatabaseConfig, useExisting, failFast bool) (*db.DatabaseContext, error) {
// Pass in a bucketFromBucketSpecFn to replace the default ConnectToBucket function. This will cause the failFast argument to be ignored
func (sc *ServerContext) _getOrAddDatabaseFromConfig(config DatabaseConfig, useExisting bool, openBucketFn db.OpenBucketFn) (*db.DatabaseContext, error) {

// Generate bucket spec and validate whether db already exists
spec, err := GetBucketSpec(&config, sc.config)
Expand Down Expand Up @@ -399,11 +400,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(config DatabaseConfig, useE
// Connect to bucket
base.InfofCtx(context.TODO(), base.KeyAll, "Opening db /%s as bucket %q, pool %q, server <%s>",
base.MD(dbName), base.MD(spec.BucketName), base.SD(base.DefaultPool), base.SD(spec.Server))
connectToBucketFn := db.ConnectToBucket
if failFast {
connectToBucketFn = db.ConnectToBucketFailFast
}
bucket, err := connectToBucketFn(spec)
bucket, err := openBucketFn(spec)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -949,13 +946,13 @@ func (sc *ServerContext) initEventHandlers(dbcontext *db.DatabaseContext, config
// Adds a database to the ServerContext given its configuration. If an existing config is found
// for the name, returns an error.
func (sc *ServerContext) AddDatabaseFromConfig(config DatabaseConfig) (*db.DatabaseContext, error) {
return sc.getOrAddDatabaseFromConfig(config, false, false)
return sc.getOrAddDatabaseFromConfig(config, false, db.GetConnectToBucketFn(false))
}

// AddDatabaseFromConfigFailFast adds a database to the ServerContext given its configuration and fails fast.
// If an existing config is found for the name, returns an error.
func (sc *ServerContext) AddDatabaseFromConfigFailFast(config DatabaseConfig) (*db.DatabaseContext, error) {
return sc.getOrAddDatabaseFromConfig(config, false, true)
return sc.getOrAddDatabaseFromConfig(config, false, db.GetConnectToBucketFn(true))
}

func (sc *ServerContext) processEventHandlersForEvent(events []*EventConfig, eventType db.EventType, dbcontext *db.DatabaseContext) error {
Expand Down
10 changes: 6 additions & 4 deletions rest/server_context_test.go
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/couchbase/sync_gateway/db"

"github.com/couchbase/gocbcore/v10/connstr"
"github.com/couchbase/sync_gateway/base"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -162,7 +164,7 @@ func TestGetOrAddDatabaseFromConfig(t *testing.T) {

// Get or add database name from config without valid database name; throws 400 Illegal database name error
dbConfig := DbConfig{OldRevExpirySeconds: &oldRevExpirySeconds, LocalDocExpirySecs: &localDocExpirySecs}
dbContext, err := serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, false)
dbContext, err := serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, db.GetConnectToBucketFn(false))
assert.Nil(t, dbContext, "Can't create database context without a valid database name")
assert.Error(t, err, "It should throw 400 Illegal database name")
assert.Contains(t, err.Error(), strconv.Itoa(http.StatusBadRequest))
Expand All @@ -181,7 +183,7 @@ func TestGetOrAddDatabaseFromConfig(t *testing.T) {
BucketConfig: BucketConfig{Server: &server, Bucket: &bucketName},
}

dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, false)
dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, db.GetConnectToBucketFn(false))
assert.Nil(t, dbContext, "Can't create database context from config with unrecognized value for import_docs")
assert.Error(t, err, "It should throw Unrecognized value for import_docs")

Expand All @@ -208,14 +210,14 @@ func TestGetOrAddDatabaseFromConfig(t *testing.T) {
AutoImport: false,
}

dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, false)
dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, false, db.GetConnectToBucketFn(false))
assert.Nil(t, dbContext, "Can't create database context with duplicate database name")
assert.Error(t, err, "It should throw 412 Duplicate database names")
assert.Contains(t, err.Error(), strconv.Itoa(http.StatusPreconditionFailed))

// Get or add database from config with duplicate database name and useExisting as true
// Existing database context should be returned
dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, true, false)
dbContext, err = serverContext._getOrAddDatabaseFromConfig(DatabaseConfig{DbConfig: dbConfig}, true, db.GetConnectToBucketFn(false))
assert.NoError(t, err, "No error while trying to get the existing database name")
assert.Equal(t, server, dbContext.BucketSpec.Server)
assert.Equal(t, bucketName, dbContext.BucketSpec.BucketName)
Expand Down
78 changes: 67 additions & 11 deletions rest/utilities_testing.go
Expand Up @@ -43,14 +43,15 @@ import (

// RestTesterConfig represents configuration for sync gateway
type RestTesterConfig struct {
guestEnabled bool // If this is true, Admin Party is in full effect
SyncFn string // put the sync() function source in here (optional)
DatabaseConfig *DatabaseConfig // Supports additional config options. BucketConfig, Name, Sync, Unsupported will be ignored (overridden)
InitSyncSeq uint64 // If specified, initializes _sync:seq on bucket creation. Not supported when running against walrus
EnableNoConflictsMode bool // Enable no-conflicts mode. By default, conflicts will be allowed, which is the default behavior
TestBucket *base.TestBucket // If set, use this bucket instead of requesting a new one.
adminInterface string // adminInterface overrides the default admin interface.
sgReplicateEnabled bool // sgReplicateManager disabled by default for RestTester
guestEnabled bool // If this is true, Admin Party is in full effect
SyncFn string // put the sync() function source in here (optional)
DatabaseConfig *DatabaseConfig // Supports additional config options. BucketConfig, Name, Sync, Unsupported will be ignored (overridden)
InitSyncSeq uint64 // If specified, initializes _sync:seq on bucket creation. Not supported when running against walrus
EnableNoConflictsMode bool // Enable no-conflicts mode. By default, conflicts will be allowed, which is the default behavior
TestBucket *base.TestBucket // If set, use this bucket instead of requesting a new one.
leakyBucketConfig *base.LeakyBucketConfig // Set to create and use a leaky bucket on the RT and DB. A test bucket cannot be passed in if using this option.
adminInterface string // adminInterface overrides the default admin interface.
sgReplicateEnabled bool // sgReplicateManager disabled by default for RestTester
hideProductInfo bool
adminInterfaceAuthentication bool
metricsInterfaceAuthentication bool
Expand Down Expand Up @@ -93,7 +94,6 @@ func NewRestTester(tb testing.TB, restConfig *RestTesterConfig) *RestTester {
}

func (rt *RestTester) Bucket() base.Bucket {

if rt.tb == nil {
panic("RestTester not properly initialized please use NewRestTester function")
} else if rt.closed {
Expand All @@ -108,6 +108,14 @@ func (rt *RestTester) Bucket() base.Bucket {
testBucket := rt.RestTesterConfig.TestBucket
if testBucket == nil {
testBucket = base.GetTestBucket(rt.tb)
if rt.leakyBucketConfig != nil {
leakyConfig := *rt.leakyBucketConfig
// Ignore closures to avoid double closing panics
leakyConfig.IgnoreClose = true
testBucket = testBucket.LeakyBucketClone(leakyConfig)
}
} else if rt.leakyBucketConfig != nil {
rt.tb.Fatalf("A passed in TestBucket cannot be used on the RestTester when defining a leakyBucketConfig")
}
rt.testBucket = testBucket

Expand Down Expand Up @@ -242,7 +250,34 @@ func (rt *RestTester) Bucket() base.Bucket {

rt.DatabaseConfig.SGReplicateEnabled = base.BoolPtr(rt.RestTesterConfig.sgReplicateEnabled)

_, err = rt.RestTesterServerContext.AddDatabaseFromConfig(*rt.DatabaseConfig)
if rt.leakyBucketConfig != nil {
// Scopes and collections have to be set on the bucket being passed in for the db to use.
// WIP: Collections Phase 1 - Grab just one scope/collection from the defined set.
// Phase 2 (multi collection) means DatabaseContext needs a set of BucketSpec/Collections, not just one...
var scope, collection *string
for scopeName, scopeConfig := range rt.RestTesterConfig.DatabaseConfig.Scopes {
scope = &scopeName
for collectionName := range scopeConfig.Collections {
collection = &collectionName
break
}
}
if scope != nil && collection != nil {
collectionBucket, err := base.AsCollection(testBucket.Bucket)
if err != nil {
rt.tb.Fatalf("Could not get collection from bucket with type %T: %v", testBucket.Bucket, err)
}

collectionBucket.Spec.Scope = scope
collectionBucket.Spec.Collection = collection
collectionBucket.Collection = collectionBucket.Collection.Bucket().Scope(*scope).Collection(*collection)
}

_, err = rt.RestTesterServerContext.AddDatabaseFromConfigWithBucket(rt.tb, *rt.DatabaseConfig, testBucket.Bucket)
} else {
_, err = rt.RestTesterServerContext.AddDatabaseFromConfig(*rt.DatabaseConfig)
}

if err != nil {
rt.tb.Fatalf("Error from AddDatabaseFromConfig: %v", err)
}
Expand All @@ -261,10 +296,23 @@ func (rt *RestTester) Bucket() base.Bucket {

// PostStartup (without actually waiting 5 seconds)
close(rt.RestTesterServerContext.hasStarted)

return rt.testBucket.Bucket
}

// LeakyBucket gets the bucket from the RestTester as a leaky bucket allowing for callbacks to be set on the fly.
// The RestTester must have been set up to create and use a leaky bucket by setting leakyBucketConfig in the RT
// config when calling NewRestTester.
func (rt *RestTester) LeakyBucket() *base.LeakyBucket {
if rt.leakyBucketConfig == nil {
rt.tb.Fatalf("Cannot get leaky bucket when leakyBucketConfig was not set on RestTester initialisation")
}
leakyBucket, ok := base.AsLeakyBucket(rt.Bucket())
if !ok {
rt.tb.Fatalf("Could not get bucket (type %T) as a leaky bucket", rt.Bucket())
}
return leakyBucket
}

func (rt *RestTester) ServerContext() *ServerContext {
rt.Bucket()
return rt.RestTesterServerContext
Expand Down Expand Up @@ -872,6 +920,14 @@ func (s *SlowResponseRecorder) Write(buf []byte) (int, error) {
return numBytesWritten, err
}

// AddDatabaseFromConfigWithBucket adds a database to the ServerContext and sets a specific bucket on the database context.
// If an existing config is found for the name, returns an error.
func (sc *ServerContext) AddDatabaseFromConfigWithBucket(tb testing.TB, config DatabaseConfig, bucket base.Bucket) (*db.DatabaseContext, error) {
return sc.getOrAddDatabaseFromConfig(config, false, func(spec base.BucketSpec) (base.Bucket, error) {
return bucket, nil
})
}

// The parameters used to create a BlipTester
type BlipTesterSpec struct {

Expand Down

0 comments on commit 943dcf4

Please sign in to comment.