Skip to content

Commit

Permalink
CBG-1663: Create new temporary connection for Bootstrap ops (#5218)
Browse files Browse the repository at this point in the history
* CBG-1663: Create new temporary connection for Bootstrap ops

* Tweak after rebase
  • Loading branch information
JRascagneres committed Sep 15, 2021
1 parent 4c74f9c commit e5618ac
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 57 deletions.
66 changes: 45 additions & 21 deletions base/bootstrap.go
Expand Up @@ -20,13 +20,12 @@ type BootstrapConnection interface {
InsertConfig(bucket, groupID string, value interface{}) (newCAS uint64, err error)
// UpdateConfig updates an existing database config for a given bucket and config group ID. updateCallback can return nil to remove the config.
UpdateConfig(bucket, groupID string, updateCallback func(rawBucketConfig []byte) (updatedConfig []byte, err error)) (newCAS uint64, err error)
// Close closes the connection
Close() error
}

// CouchbaseCluster is a GoCBv2 implementation of BootstrapConnection
type CouchbaseCluster struct {
c *gocb.Cluster
server string
clusterOptions gocb.ClusterOptions
}

var _ BootstrapConnection = &CouchbaseCluster{}
Expand Down Expand Up @@ -55,12 +54,17 @@ func NewCouchbaseCluster(server, username, password,
RetryStrategy: &goCBv2FailFastRetryStrategy{},
}

cluster, err := gocb.Connect(server, clusterOptions)
return &CouchbaseCluster{server: server, clusterOptions: clusterOptions}, nil
}

// connect attempts to open a gocb.Cluster connection. Callers will be responsible for closing the connection.
func (cc *CouchbaseCluster) connect() (*gocb.Cluster, error) {
cluster, err := gocb.Connect(cc.server, cc.clusterOptions)
if err != nil {
return nil, err
}

err = cluster.WaitUntilReady(time.Second*5, &gocb.WaitUntilReadyOptions{
err = cluster.WaitUntilReady(time.Second*10, &gocb.WaitUntilReadyOptions{
DesiredState: gocb.ClusterStateOnline,
ServiceTypes: []gocb.ServiceType{gocb.ServiceTypeManagement},
RetryStrategy: &goCBv2FailFastRetryStrategy{},
Expand All @@ -70,15 +74,24 @@ func NewCouchbaseCluster(server, username, password,
return nil, err
}

return &CouchbaseCluster{c: cluster}, nil
return cluster, nil
}

func (cc *CouchbaseCluster) GetConfigBuckets() ([]string, error) {
if cc == nil {
return nil, errors.New("nil CouchbaseCluster")
}

buckets, err := cc.c.Buckets().GetAllBuckets(nil)
connection, err := cc.connect()
if err != nil {
return nil, err
}

defer func() {
_ = connection.Close(&gocb.ClusterCloseOptions{})
}()

buckets, err := connection.Buckets().GetAllBuckets(nil)
if err != nil {
return nil, err
}
Expand All @@ -96,11 +109,13 @@ func (cc *CouchbaseCluster) GetConfig(location, groupID string, valuePtr interfa
return 0, errors.New("nil CouchbaseCluster")
}

b, err := cc.getBucket(location)
b, teardown, err := cc.getBucket(location)
if err != nil {
return 0, err
}

defer teardown()

res, err := b.DefaultCollection().Get(PersistentConfigPrefix+groupID, &gocb.GetOptions{
Timeout: time.Second * 10,
RetryStrategy: gocb.NewBestEffortRetryStrategy(nil),
Expand All @@ -124,10 +139,11 @@ func (cc *CouchbaseCluster) InsertConfig(location, groupID string, value interfa
return 0, errors.New("nil CouchbaseCluster")
}

b, err := cc.getBucket(location)
b, teardown, err := cc.getBucket(location)
if err != nil {
return 0, err
}
defer teardown()

docID := PersistentConfigPrefix + groupID
res, err := b.DefaultCollection().Insert(docID, value, nil)
Expand All @@ -146,10 +162,11 @@ func (cc *CouchbaseCluster) UpdateConfig(location, groupID string, updateCallbac
return 0, errors.New("nil CouchbaseCluster")
}

b, err := cc.getBucket(location)
b, teardown, err := cc.getBucket(location)
if err != nil {
return 0, err
}
defer teardown()

collection := b.DefaultCollection()

Expand Down Expand Up @@ -200,25 +217,32 @@ func (cc *CouchbaseCluster) UpdateConfig(location, groupID string, updateCallbac

}

func (cc *CouchbaseCluster) Close() error {
if cc.c == nil {
return nil
// getBucket returns the bucket after waiting for it to be ready.
func (cc *CouchbaseCluster) getBucket(bucketName string) (b *gocb.Bucket, teardownFn func(), err error) {
connection, err := cc.connect()
if err != nil {
return nil, nil, err
}
return cc.c.Close(&gocb.ClusterCloseOptions{})
}

// getBucket returns the bucket after waiting for it to be ready.
func (cc *CouchbaseCluster) getBucket(bucketName string) (*gocb.Bucket, error) {
b := cc.c.Bucket(bucketName)
err := b.WaitUntilReady(time.Second*10, &gocb.WaitUntilReadyOptions{
b = connection.Bucket(bucketName)
err = b.WaitUntilReady(time.Second*10, &gocb.WaitUntilReadyOptions{
DesiredState: gocb.ClusterStateOnline,
RetryStrategy: gocb.NewBestEffortRetryStrategy(nil),
ServiceTypes: []gocb.ServiceType{gocb.ServiceTypeKeyValue},
})
if err != nil {
return nil, err
_ = connection.Close(&gocb.ClusterCloseOptions{})
return nil, nil, err
}
return b, nil

teardownFn = func() {
err := connection.Close(&gocb.ClusterCloseOptions{})
if err != nil {
Warnf("Failed to close cluster connection: %v", err)
}
}

return b, teardownFn, nil
}

// ConfigMerge applies non-empty fields from b onto non-empty fields on a
Expand Down
1 change: 1 addition & 0 deletions base/collection.go
Expand Up @@ -93,6 +93,7 @@ func GetCouchbaseCollection(spec BucketSpec) (*Collection, error) {
// TODO: identify required services and add to WaitUntilReadyOptions
err = bucket.WaitUntilReady(30*time.Second, nil)
if err != nil {
_ = cluster.Close(&gocb.ClusterCloseOptions{})
Warnf("Error waiting for bucket to be ready: %v", err)
return nil, err
}
Expand Down
23 changes: 23 additions & 0 deletions rest/admin_api_test.go
Expand Up @@ -3488,3 +3488,26 @@ func TestDbConfigDoesNotIncludeCredentials(t *testing.T) {
assert.Equal(t, "", dbConfig.KeyPath)

}

func TestCreateDbOnNonExistentBucket(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
t.Skip("This test only works against Couchbase Server")
}

defer base.SetUpTestLogging(base.LevelInfo, base.KeyHTTP)()

// Start SG with no databases in bucket(s)
config := bootstrapStartupConfigForTest(t)
sc, err := setupServerContext(&config, true)
require.NoError(t, err)
defer sc.Close()

serverErr := make(chan error, 0)
go func() {
serverErr <- startServer(&config, sc)
}()
require.NoError(t, sc.waitForRESTAPIs())

resp := bootstrapAdminRequest(t, http.MethodPut, "/db/", `{"bucket": "nonExistentBucket"}`)
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
}
2 changes: 1 addition & 1 deletion rest/config.go
Expand Up @@ -949,7 +949,7 @@ func setupServerContext(config *StartupConfig, persistentConfig bool) (*ServerCo

// Fetch database configs from bucket and start polling for new buckets and config updates.
if sc.persistentConfig {
couchbaseCluster, err := establishCouchbaseClusterConnection(sc.config)
couchbaseCluster, err := createCouchbaseClusterFromStartupConfig(sc.config)
if err != nil {
return nil, err
}
Expand Down
28 changes: 7 additions & 21 deletions rest/main.go
Expand Up @@ -217,15 +217,11 @@ func automaticConfigUpgrade(configPath string) (sc *StartupConfig, disablePersis
}

// Attempt to establish connection to server
cluster, err := establishCouchbaseClusterConnection(startupConfig)
cluster, err := createCouchbaseClusterFromStartupConfig(startupConfig)
if err != nil {
return nil, false, err
}

defer func() {
_ = cluster.Close()
}()

// Write database configs to CBS with groupID "default"
for _, dbConfig := range dbConfigs {
dbc := dbConfig.ToDatabaseConfig()
Expand Down Expand Up @@ -353,24 +349,14 @@ func backupCurrentConfigFile(sourcePath string) (string, error) {
return backupPath, nil
}

func establishCouchbaseClusterConnection(config *StartupConfig) (*base.CouchbaseCluster, error) {
err, c := base.RetryLoop("Cluster Bootstrap", func() (shouldRetry bool, err error, value interface{}) {
cluster, err := base.NewCouchbaseCluster(config.Bootstrap.Server, config.Bootstrap.Username,
config.Bootstrap.Password, config.Bootstrap.X509CertPath, config.Bootstrap.X509KeyPath,
config.Bootstrap.CACertPath, config.Bootstrap.ServerTLSSkipVerify)
if err != nil {
base.Infof(base.KeyConfig, "Couldn't connect to bootstrap cluster: %v - will retry...", err)
return true, err, nil
}

return false, nil, cluster
}, base.CreateSleeperFunc(27, 1000)) // ~2 mins total - 5 second gocb WaitUntilReady timeout and 1 second interval
func createCouchbaseClusterFromStartupConfig(config *StartupConfig) (*base.CouchbaseCluster, error) {
cluster, err := base.NewCouchbaseCluster(config.Bootstrap.Server, config.Bootstrap.Username,
config.Bootstrap.Password, config.Bootstrap.X509CertPath, config.Bootstrap.X509KeyPath,
config.Bootstrap.CACertPath, config.Bootstrap.ServerTLSSkipVerify)
if err != nil {
base.Infof(base.KeyConfig, "Couldn't create couchbase cluster instance: %v", err)
return nil, err
}

base.Infof(base.KeyConfig, "Successfully connected to cluster")
clusterConnection := c.(*base.CouchbaseCluster)

return clusterConnection, nil
return cluster, nil
}
10 changes: 2 additions & 8 deletions rest/persistent_config_test.go
Expand Up @@ -82,11 +82,8 @@ func TestAutomaticConfigUpgrade(t *testing.T) {

assert.Equal(t, config, string(writtenBackupFile))

cbs, err := establishCouchbaseClusterConnection(startupConfig)
cbs, err := createCouchbaseClusterFromStartupConfig(startupConfig)
require.NoError(t, err)
defer func() {
_ = cbs.Close()
}()

var dbConfig DbConfig
_, err = cbs.GetConfig(tb.GetName(), persistentConfigDefaultGroupID, &dbConfig)
Expand Down Expand Up @@ -204,11 +201,8 @@ func TestAutomaticConfigUpgradeExistingConfig(t *testing.T) {
startupConfig, _, err := automaticConfigUpgrade(updatedConfigPath)
require.NoError(t, err)

cbs, err := establishCouchbaseClusterConnection(startupConfig)
cbs, err := createCouchbaseClusterFromStartupConfig(startupConfig)
require.NoError(t, err)
defer func() {
_ = cbs.Close()
}()

var dbConfig DbConfig
_, err = cbs.GetConfig(tb.GetName(), persistentConfigDefaultGroupID, &dbConfig)
Expand Down
6 changes: 0 additions & 6 deletions rest/server_context.go
Expand Up @@ -198,12 +198,6 @@ func (sc *ServerContext) Close() {
}
sc._httpServers = nil

if c := sc.bootstrapContext.connection; c != nil {
if err := c.Close(); err != nil {
base.Warnf("Error closing bootstrap cluster connection: %v", err)
}
}

if agent := sc.GoCBAgent; agent != nil {
if err := agent.Close(); err != nil {
base.Warnf("Error closing agent connection: %v", err)
Expand Down

0 comments on commit e5618ac

Please sign in to comment.