Skip to content

Commit

Permalink
GOCBC-182: Use gocbconnstr and improve client configurability.
Browse files Browse the repository at this point in the history
Change-Id: I51be1c198f1e813b91415bd89b1b179f3ddd08f7
Reviewed-on: http://review.couchbase.org/76423
Reviewed-by: Mark Nunberg <mark.nunberg@couchbase.com>
Tested-by: Brett Lawson <brett19@gmail.com>
  • Loading branch information
brett19 committed Apr 10, 2017
1 parent ccd582e commit 2b444be
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 132 deletions.
18 changes: 9 additions & 9 deletions bucket_cluster_test.go
Expand Up @@ -28,7 +28,7 @@ func TestBootstrapOn(t *testing.T) {
if err != nil {
t.Fatalf("Multi-host connection string failed: %v", err)
}
if len(c.spec.HttpHosts) != 3 || len(c.spec.MemcachedHosts) != 3 {
if len(c.agentConfig.HttpAddrs) != 3 || len(c.agentConfig.MemdAddrs) != 3 {
t.Fatal("Wrong number of hosts for http/memcached")
}

Expand All @@ -37,22 +37,22 @@ func TestBootstrapOn(t *testing.T) {
if err != nil {
t.Fatalf("bootstrap_on=http: %v", err)
}
if len(c.spec.HttpHosts) != 3 {
t.Fatalf("HttpHosts is not 3 (%v)", c.spec.HttpHosts)
if len(c.agentConfig.HttpAddrs) != 3 {
t.Fatalf("HttpHosts is not 3 (%v)", c.agentConfig.HttpAddrs)
}
if len(c.spec.MemcachedHosts) != 0 {
t.Fatalf("MemcachedHosts is not 0: %v", c.spec.MemcachedHosts)
if len(c.agentConfig.MemdAddrs) != 0 {
t.Fatalf("MemcachedHosts is not 0: %v", c.agentConfig.MemdAddrs)
}

c, err = Connect("couchbase://foo.com,bar.com,baz.com?bootstrap_on=cccp")
if err != nil {
t.Fatalf("bootstrap_on=cccp: %v", err)
}
if len(c.spec.MemcachedHosts) != 3 {
t.Fatalf("Expected 3 hosts in memcached: %v", c.spec.MemcachedHosts)
if len(c.agentConfig.MemdAddrs) != 3 {
t.Fatalf("Expected 3 hosts in memcached: %v", c.agentConfig.MemdAddrs)
}
if len(c.spec.HttpHosts) != 0 {
t.Fatalf("Expected 0 hosts in http: %v", c.spec.HttpHosts)
if len(c.agentConfig.HttpAddrs) != 0 {
t.Fatalf("Expected 0 hosts in http: %v", c.agentConfig.HttpAddrs)
}

// Should fail if there are no hosts
Expand Down
186 changes: 63 additions & 123 deletions cluster.go
@@ -1,26 +1,22 @@
package gocb

import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"gopkg.in/couchbase/gocbcore.v6"
"io/ioutil"
"gopkg.in/couchbaselabs/gocbconnstr.v1"
"net/http"
"strconv"
"sync"
"time"
)

// Cluster represents a connection to a specific Couchbase cluster.
type Cluster struct {
spec connSpec
auth Authenticator
connectTimeout time.Duration
serverConnectTimeout time.Duration
n1qlTimeout time.Duration
ftsTimeout time.Duration
nmvRetryDelay time.Duration
tlsConfig *tls.Config
auth Authenticator
agentConfig gocbcore.AgentConfig
n1qlTimeout time.Duration
ftsTimeout time.Duration

clusterLock sync.RWMutex
queryCache map[string]*n1qlCache
Expand All @@ -29,67 +25,78 @@ type Cluster struct {

// Connect creates a new Cluster object for a specific cluster.
func Connect(connSpecStr string) (*Cluster, error) {
spec, err := parseConnSpec(connSpecStr)
spec, err := gocbconnstr.Parse(connSpecStr)
if err != nil {
return nil, err
}

if spec.Bucket != "" {
return nil, errors.New("Connection string passed to Connect() must not have any bucket specified!")
}

csResolveDnsSrv(&spec)

// Get bootstrap_on option to determine which, if any, of the bootstrap nodes should be cleared
switch spec.Options.Get("bootstrap_on") {
case "http":
spec.MemcachedHosts = nil
if len(spec.HttpHosts) == 0 {
return nil, errors.New("bootstrap_on=http but no HTTP hosts in connection string")
}
case "cccp":
spec.HttpHosts = nil
if len(spec.MemcachedHosts) == 0 {
return nil, errors.New("bootstrap_on=cccp but no CCCP/Memcached hosts in connection string")
fetchOption := func(name string) (string, bool) {
optValue := spec.Options[name]
if len(optValue) == 0 {
return "", false
}
case "both":
case "":
// Do nothing
break
default:
return nil, errors.New("bootstrap_on={http,cccp,both}")
return optValue[len(optValue)-1], true
}

config := gocbcore.AgentConfig{
ConnectTimeout: 60000 * time.Millisecond,
ServerConnectTimeout: 7000 * time.Millisecond,
NmvRetryDelay: 100 * time.Millisecond,
}
err = config.FromConnStr(connSpecStr)
if err != nil {
return nil, err
}

cluster := &Cluster{
spec: spec,
connectTimeout: 60000 * time.Millisecond,
serverConnectTimeout: 7000 * time.Millisecond,
n1qlTimeout: 75 * time.Second,
ftsTimeout: 75 * time.Second,
nmvRetryDelay: 100 * time.Millisecond,
agentConfig: config,
n1qlTimeout: 75 * time.Second,
ftsTimeout: 75 * time.Second,

queryCache: make(map[string]*n1qlCache),
}

if valStr, ok := fetchOption("n1ql_timeout"); ok {
val, err := strconv.ParseInt(valStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("n1ql_timeout option must be a number")
}
cluster.n1qlTimeout = time.Duration(val) * time.Millisecond
}

if valStr, ok := fetchOption("fts_timeout"); ok {
val, err := strconv.ParseInt(valStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("fts_timeout option must be a number")
}
cluster.ftsTimeout = time.Duration(val) * time.Millisecond
}

return cluster, nil
}

// ConnectTimeout returns the maximum time to wait when attempting to connect to a bucket.
func (c *Cluster) ConnectTimeout() time.Duration {
return c.connectTimeout
return c.agentConfig.ConnectTimeout
}

// SetConnectTimeout sets the maximum time to wait when attempting to connect to a bucket.
func (c *Cluster) SetConnectTimeout(timeout time.Duration) {
c.connectTimeout = timeout
c.agentConfig.ConnectTimeout = timeout
}

// ServerConnectTimeout returns the maximum time to attempt to connect to a single node.
func (c *Cluster) ServerConnectTimeout() time.Duration {
return c.serverConnectTimeout
return c.agentConfig.ServerConnectTimeout
}

// SetServerConnectTimeout sets the maximum time to attempt to connect to a single node.
func (c *Cluster) SetServerConnectTimeout(timeout time.Duration) {
c.serverConnectTimeout = timeout
c.agentConfig.ServerConnectTimeout = timeout
}

// N1qlTimeout returns the maximum time to wait for a cluster-level N1QL query to complete.
Expand All @@ -114,12 +121,12 @@ func (c *Cluster) SetFtsTimeout(timeout time.Duration) {

// NmvRetryDelay returns the time to wait between retrying an operation due to not my vbucket.
func (c *Cluster) NmvRetryDelay() time.Duration {
return c.nmvRetryDelay
return c.agentConfig.NmvRetryDelay
}

// SetNmvRetryDelay sets the time to wait between retrying an operation due to not my vbucket.
func (c *Cluster) SetNmvRetryDelay(delay time.Duration) {
c.nmvRetryDelay = delay
c.agentConfig.NmvRetryDelay = delay
}

// InvalidateQueryCache forces the internal cache of prepared queries to be cleared.
Expand All @@ -129,74 +136,18 @@ func (c *Cluster) InvalidateQueryCache() {
c.clusterLock.Unlock()
}

func specToHosts(spec connSpec) ([]string, []string, bool) {
var memdHosts []string
var httpHosts []string

for _, specHost := range spec.HttpHosts {
httpHosts = append(httpHosts, specHost.HostPort())
}

for _, specHost := range spec.MemcachedHosts {
memdHosts = append(memdHosts, specHost.HostPort())
}

return memdHosts, httpHosts, spec.Scheme.IsSSL()
}

func (c *Cluster) makeAgentConfig(bucket, username, password string, mt bool) (*gocbcore.AgentConfig, error) {
authFn := func(client gocbcore.AuthClient, deadline time.Time) error {
if err := gocbcore.SaslAuthPlain(username, password, client, deadline); err != nil {
return err
}

if bucket != username {
if err := client.ExecSelectBucket([]byte(bucket), deadline); err != nil {
return err
}
}

return nil
}

memdHosts, httpHosts, isSslHosts := specToHosts(c.spec)

var tlsConfig *tls.Config
if isSslHosts {
func (c *Cluster) makeAgentConfig(bucket, username, password string, forceMt bool) (*gocbcore.AgentConfig, error) {
config := c.agentConfig

certpath := c.spec.Options.Get("certpath")
config.BucketName = bucket
config.Username = username
config.Password = password

tlsConfig = &tls.Config{}
if certpath == "" {
tlsConfig.InsecureSkipVerify = true
} else {
cacert, err := ioutil.ReadFile(certpath)
if err != nil {
return nil, err
}

roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM(cacert)
if !ok {
return nil, ErrInvalidCert
}
tlsConfig.RootCAs = roots
}
if forceMt {
config.UseMutationTokens = true
}

return &gocbcore.AgentConfig{
MemdAddrs: memdHosts,
HttpAddrs: httpHosts,
TlsConfig: tlsConfig,
BucketName: bucket,
Username: username,
Password: password,
AuthHandler: authFn,
UseMutationTokens: mt,
ConnectTimeout: c.connectTimeout,
ServerConnectTimeout: c.serverConnectTimeout,
NmvRetryDelay: c.nmvRetryDelay,
}, nil
return &config, nil
}

// Authenticate specifies an Authenticator interface to use to authenticate with cluster services.
Expand All @@ -205,7 +156,7 @@ func (c *Cluster) Authenticate(auth Authenticator) error {
return nil
}

func (c *Cluster) openBucket(bucket, password string, mt bool) (*Bucket, error) {
func (c *Cluster) openBucket(bucket, password string, forceMt bool) (*Bucket, error) {
username := bucket
if password == "" {
if c.auth != nil {
Expand All @@ -215,7 +166,7 @@ func (c *Cluster) openBucket(bucket, password string, mt bool) (*Bucket, error)
}
}

agentConfig, err := c.makeAgentConfig(bucket, username, password, mt)
agentConfig, err := c.makeAgentConfig(bucket, username, password, forceMt)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -264,27 +215,16 @@ func (c *Cluster) Manager(username, password string) *ClusterManager {
}
}

_, httpHosts, isSslHosts := specToHosts(c.spec)
var mgmtHosts []string

for _, host := range httpHosts {
if isSslHosts {
for _, host := range c.agentConfig.HttpAddrs {
if c.agentConfig.TlsConfig != nil {
mgmtHosts = append(mgmtHosts, "https://"+host)
} else {
mgmtHosts = append(mgmtHosts, "http://"+host)
}
}

var tlsConfig *tls.Config
if isSslHosts {
tlsConfig = c.tlsConfig
if tlsConfig == nil {
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
}

tlsConfig := c.agentConfig.TlsConfig
return &ClusterManager{
hosts: mgmtHosts,
username: userPass.Username,
Expand Down

0 comments on commit 2b444be

Please sign in to comment.