From 2b444be098668f9107b3bb3acba7c0ce941db141 Mon Sep 17 00:00:00 2001 From: Brett Lawson Date: Wed, 5 Apr 2017 11:18:52 -0700 Subject: [PATCH] GOCBC-182: Use gocbconnstr and improve client configurability. Change-Id: I51be1c198f1e813b91415bd89b1b179f3ddd08f7 Reviewed-on: http://review.couchbase.org/76423 Reviewed-by: Mark Nunberg Tested-by: Brett Lawson --- bucket_cluster_test.go | 18 ++-- cluster.go | 186 ++++++++++++++--------------------------- 2 files changed, 72 insertions(+), 132 deletions(-) diff --git a/bucket_cluster_test.go b/bucket_cluster_test.go index 17d7bbde..4ec1587b 100644 --- a/bucket_cluster_test.go +++ b/bucket_cluster_test.go @@ -28,7 +28,7 @@ func TestBootstrapOn(t *testing.T) { if err != nil { t.Fatalf("Multi-host connection string failed: %v", err) } - if len(c.spec.HttpHosts) != 3 || len(c.spec.MemcachedHosts) != 3 { + if len(c.agentConfig.HttpAddrs) != 3 || len(c.agentConfig.MemdAddrs) != 3 { t.Fatal("Wrong number of hosts for http/memcached") } @@ -37,22 +37,22 @@ func TestBootstrapOn(t *testing.T) { if err != nil { t.Fatalf("bootstrap_on=http: %v", err) } - if len(c.spec.HttpHosts) != 3 { - t.Fatalf("HttpHosts is not 3 (%v)", c.spec.HttpHosts) + if len(c.agentConfig.HttpAddrs) != 3 { + t.Fatalf("HttpHosts is not 3 (%v)", c.agentConfig.HttpAddrs) } - if len(c.spec.MemcachedHosts) != 0 { - t.Fatalf("MemcachedHosts is not 0: %v", c.spec.MemcachedHosts) + if len(c.agentConfig.MemdAddrs) != 0 { + t.Fatalf("MemcachedHosts is not 0: %v", c.agentConfig.MemdAddrs) } c, err = Connect("couchbase://foo.com,bar.com,baz.com?bootstrap_on=cccp") if err != nil { t.Fatalf("bootstrap_on=cccp: %v", err) } - if len(c.spec.MemcachedHosts) != 3 { - t.Fatalf("Expected 3 hosts in memcached: %v", c.spec.MemcachedHosts) + if len(c.agentConfig.MemdAddrs) != 3 { + t.Fatalf("Expected 3 hosts in memcached: %v", c.agentConfig.MemdAddrs) } - if len(c.spec.HttpHosts) != 0 { - t.Fatalf("Expected 0 hosts in http: %v", c.spec.HttpHosts) + if len(c.agentConfig.HttpAddrs) != 0 { + t.Fatalf("Expected 0 hosts in http: %v", c.agentConfig.HttpAddrs) } // Should fail if there are no hosts diff --git a/cluster.go b/cluster.go index 3b5852df..baac9be9 100644 --- a/cluster.go +++ b/cluster.go @@ -1,26 +1,22 @@ package gocb import ( - "crypto/tls" - "crypto/x509" "errors" + "fmt" "gopkg.in/couchbase/gocbcore.v6" - "io/ioutil" + "gopkg.in/couchbaselabs/gocbconnstr.v1" "net/http" + "strconv" "sync" "time" ) // Cluster represents a connection to a specific Couchbase cluster. type Cluster struct { - spec connSpec - auth Authenticator - connectTimeout time.Duration - serverConnectTimeout time.Duration - n1qlTimeout time.Duration - ftsTimeout time.Duration - nmvRetryDelay time.Duration - tlsConfig *tls.Config + auth Authenticator + agentConfig gocbcore.AgentConfig + n1qlTimeout time.Duration + ftsTimeout time.Duration clusterLock sync.RWMutex queryCache map[string]*n1qlCache @@ -29,67 +25,78 @@ type Cluster struct { // Connect creates a new Cluster object for a specific cluster. func Connect(connSpecStr string) (*Cluster, error) { - spec, err := parseConnSpec(connSpecStr) + spec, err := gocbconnstr.Parse(connSpecStr) if err != nil { return nil, err } + if spec.Bucket != "" { return nil, errors.New("Connection string passed to Connect() must not have any bucket specified!") } - csResolveDnsSrv(&spec) - - // Get bootstrap_on option to determine which, if any, of the bootstrap nodes should be cleared - switch spec.Options.Get("bootstrap_on") { - case "http": - spec.MemcachedHosts = nil - if len(spec.HttpHosts) == 0 { - return nil, errors.New("bootstrap_on=http but no HTTP hosts in connection string") - } - case "cccp": - spec.HttpHosts = nil - if len(spec.MemcachedHosts) == 0 { - return nil, errors.New("bootstrap_on=cccp but no CCCP/Memcached hosts in connection string") + fetchOption := func(name string) (string, bool) { + optValue := spec.Options[name] + if len(optValue) == 0 { + return "", false } - case "both": - case "": - // Do nothing - break - default: - return nil, errors.New("bootstrap_on={http,cccp,both}") + return optValue[len(optValue)-1], true + } + + config := gocbcore.AgentConfig{ + ConnectTimeout: 60000 * time.Millisecond, + ServerConnectTimeout: 7000 * time.Millisecond, + NmvRetryDelay: 100 * time.Millisecond, + } + err = config.FromConnStr(connSpecStr) + if err != nil { + return nil, err } cluster := &Cluster{ - spec: spec, - connectTimeout: 60000 * time.Millisecond, - serverConnectTimeout: 7000 * time.Millisecond, - n1qlTimeout: 75 * time.Second, - ftsTimeout: 75 * time.Second, - nmvRetryDelay: 100 * time.Millisecond, + agentConfig: config, + n1qlTimeout: 75 * time.Second, + ftsTimeout: 75 * time.Second, queryCache: make(map[string]*n1qlCache), } + + if valStr, ok := fetchOption("n1ql_timeout"); ok { + val, err := strconv.ParseInt(valStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("n1ql_timeout option must be a number") + } + cluster.n1qlTimeout = time.Duration(val) * time.Millisecond + } + + if valStr, ok := fetchOption("fts_timeout"); ok { + val, err := strconv.ParseInt(valStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("fts_timeout option must be a number") + } + cluster.ftsTimeout = time.Duration(val) * time.Millisecond + } + return cluster, nil } // ConnectTimeout returns the maximum time to wait when attempting to connect to a bucket. func (c *Cluster) ConnectTimeout() time.Duration { - return c.connectTimeout + return c.agentConfig.ConnectTimeout } // SetConnectTimeout sets the maximum time to wait when attempting to connect to a bucket. func (c *Cluster) SetConnectTimeout(timeout time.Duration) { - c.connectTimeout = timeout + c.agentConfig.ConnectTimeout = timeout } // ServerConnectTimeout returns the maximum time to attempt to connect to a single node. func (c *Cluster) ServerConnectTimeout() time.Duration { - return c.serverConnectTimeout + return c.agentConfig.ServerConnectTimeout } // SetServerConnectTimeout sets the maximum time to attempt to connect to a single node. func (c *Cluster) SetServerConnectTimeout(timeout time.Duration) { - c.serverConnectTimeout = timeout + c.agentConfig.ServerConnectTimeout = timeout } // N1qlTimeout returns the maximum time to wait for a cluster-level N1QL query to complete. @@ -114,12 +121,12 @@ func (c *Cluster) SetFtsTimeout(timeout time.Duration) { // NmvRetryDelay returns the time to wait between retrying an operation due to not my vbucket. func (c *Cluster) NmvRetryDelay() time.Duration { - return c.nmvRetryDelay + return c.agentConfig.NmvRetryDelay } // SetNmvRetryDelay sets the time to wait between retrying an operation due to not my vbucket. func (c *Cluster) SetNmvRetryDelay(delay time.Duration) { - c.nmvRetryDelay = delay + c.agentConfig.NmvRetryDelay = delay } // InvalidateQueryCache forces the internal cache of prepared queries to be cleared. @@ -129,74 +136,18 @@ func (c *Cluster) InvalidateQueryCache() { c.clusterLock.Unlock() } -func specToHosts(spec connSpec) ([]string, []string, bool) { - var memdHosts []string - var httpHosts []string - - for _, specHost := range spec.HttpHosts { - httpHosts = append(httpHosts, specHost.HostPort()) - } - - for _, specHost := range spec.MemcachedHosts { - memdHosts = append(memdHosts, specHost.HostPort()) - } - - return memdHosts, httpHosts, spec.Scheme.IsSSL() -} - -func (c *Cluster) makeAgentConfig(bucket, username, password string, mt bool) (*gocbcore.AgentConfig, error) { - authFn := func(client gocbcore.AuthClient, deadline time.Time) error { - if err := gocbcore.SaslAuthPlain(username, password, client, deadline); err != nil { - return err - } - - if bucket != username { - if err := client.ExecSelectBucket([]byte(bucket), deadline); err != nil { - return err - } - } - - return nil - } - - memdHosts, httpHosts, isSslHosts := specToHosts(c.spec) - - var tlsConfig *tls.Config - if isSslHosts { +func (c *Cluster) makeAgentConfig(bucket, username, password string, forceMt bool) (*gocbcore.AgentConfig, error) { + config := c.agentConfig - certpath := c.spec.Options.Get("certpath") + config.BucketName = bucket + config.Username = username + config.Password = password - tlsConfig = &tls.Config{} - if certpath == "" { - tlsConfig.InsecureSkipVerify = true - } else { - cacert, err := ioutil.ReadFile(certpath) - if err != nil { - return nil, err - } - - roots := x509.NewCertPool() - ok := roots.AppendCertsFromPEM(cacert) - if !ok { - return nil, ErrInvalidCert - } - tlsConfig.RootCAs = roots - } + if forceMt { + config.UseMutationTokens = true } - return &gocbcore.AgentConfig{ - MemdAddrs: memdHosts, - HttpAddrs: httpHosts, - TlsConfig: tlsConfig, - BucketName: bucket, - Username: username, - Password: password, - AuthHandler: authFn, - UseMutationTokens: mt, - ConnectTimeout: c.connectTimeout, - ServerConnectTimeout: c.serverConnectTimeout, - NmvRetryDelay: c.nmvRetryDelay, - }, nil + return &config, nil } // Authenticate specifies an Authenticator interface to use to authenticate with cluster services. @@ -205,7 +156,7 @@ func (c *Cluster) Authenticate(auth Authenticator) error { return nil } -func (c *Cluster) openBucket(bucket, password string, mt bool) (*Bucket, error) { +func (c *Cluster) openBucket(bucket, password string, forceMt bool) (*Bucket, error) { username := bucket if password == "" { if c.auth != nil { @@ -215,7 +166,7 @@ func (c *Cluster) openBucket(bucket, password string, mt bool) (*Bucket, error) } } - agentConfig, err := c.makeAgentConfig(bucket, username, password, mt) + agentConfig, err := c.makeAgentConfig(bucket, username, password, forceMt) if err != nil { return nil, err } @@ -264,27 +215,16 @@ func (c *Cluster) Manager(username, password string) *ClusterManager { } } - _, httpHosts, isSslHosts := specToHosts(c.spec) var mgmtHosts []string - - for _, host := range httpHosts { - if isSslHosts { + for _, host := range c.agentConfig.HttpAddrs { + if c.agentConfig.TlsConfig != nil { mgmtHosts = append(mgmtHosts, "https://"+host) } else { mgmtHosts = append(mgmtHosts, "http://"+host) } } - var tlsConfig *tls.Config - if isSslHosts { - tlsConfig = c.tlsConfig - if tlsConfig == nil { - tlsConfig = &tls.Config{ - InsecureSkipVerify: true, - } - } - } - + tlsConfig := c.agentConfig.TlsConfig return &ClusterManager{ hosts: mgmtHosts, username: userPass.Username,