Skip to content

Commit

Permalink
CBG-730 - Backport CBG-695 to 2.7.1 (#4507)
Browse files Browse the repository at this point in the history
CBG-695 - Support external alternate address heuristic (#4506)

* CBG-695 - Support external alternate address heuristic

* Pass down Cbgt logging context into alt address shims

* Always use nodes listed in nodeServices for alternate address mapping

* Improve logging

* Invert logic - default to external host (if present) - and use internal when any given connStr host matches a default/internal network hostname.
  • Loading branch information
bbrks committed Feb 21, 2020
1 parent fd9c6d6 commit a08bf70
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 49 deletions.
71 changes: 49 additions & 22 deletions base/dcp_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
sgbucket "github.com/couchbase/sg-bucket"
"github.com/google/uuid"
pkgerrors "github.com/pkg/errors"
"gopkg.in/couchbaselabs/gocbconnstr.v1"
)

// Number of non-checkpoint updates per vbucket required to trigger metadata persistence. Must be greater than zero to avoid
Expand Down Expand Up @@ -556,7 +557,7 @@ func GenerateDcpStreamName(feedID string) (string, error) {
}

// getExternalAlternateAddress returns a external alternate address for a given dest
func getExternalAlternateAddress(alternateAddressMap map[string]string, dest string) (string, error) {
func getExternalAlternateAddress(loggingCtx context.Context, alternateAddressMap map[string]string, dest string) (string, error) {

destHost, destPort, err := SplitHostPort(dest)
if err != nil {
Expand All @@ -573,15 +574,15 @@ func getExternalAlternateAddress(alternateAddressMap map[string]string, dest str
host = extHostname
}

Tracef(KeyDCP, "Found alternate address mapping %s => %s", MD(dest), MD(host+":"+port))
InfofCtx(loggingCtx, KeyDCP, "Using alternate address %s => %s", MD(dest), MD(host+":"+port))
dest = host + ":" + port
}

return dest, nil
}

// alternateAddressShims returns the 3 functions that wrap around ConnectBucket/Connect/ConnectTLS to provide alternate address support.
func alternateAddressShims(bucketSpecTLS bool) (
func alternateAddressShims(loggingCtx context.Context, bucketSpecTLS bool, connSpecAddresses []gocbconnstr.Address) (
connectBucketShim func(serverURL, poolName, bucketName string, auth couchbase.AuthHandler) (cbdatasource.Bucket, error),
connectShim func(protocol, dest string) (*memcached.Client, error),
connectTLSShim func(protocol, dest string, tlsConfig *tls.Config) (*memcached.Client, error),
Expand All @@ -592,7 +593,7 @@ func alternateAddressShims(bucketSpecTLS bool) (

// Copy of cbdatasource's default ConnectBucket function, which maps internal addresses to alternate addresses
connectBucketShim = func(serverURL, poolName, bucketName string, auth couchbase.AuthHandler) (cbdatasource.Bucket, error) {
Tracef(KeyDCP, "ConnectBucket callback: %s %s %s", MD(serverURL), poolName, MD(bucketName))
TracefCtx(loggingCtx, KeyDCP, "ConnectBucket callback: %s %s %s", MD(serverURL), poolName, MD(bucketName))

var (
err error
Expand All @@ -608,36 +609,62 @@ func alternateAddressShims(bucketSpecTLS bool) (
return nil, err
}

pool, err := client.GetPool(poolName)
// Fetch any alternate external addresses/ports and store them in the externalAlternateAddresses map
poolServices, err := client.GetPoolServices(poolName)
if err != nil {
return nil, err
}

connSpecAddressesHostMap := make(map[string]struct{}, len(connSpecAddresses))
for _, connSpecAddress := range connSpecAddresses {
connSpecAddressesHostMap[connSpecAddress.Host] = struct{}{}
}

// Recreate the map to forget about previous clustermap information.
externalAlternateAddresses = make(map[string]string, len(pool.Nodes))
for _, node := range pool.Nodes {
if external, ok := node.AlternateNames["external"]; ok && external.Hostname != "" {
nodeHostname, _, err := SplitHostPort(node.Hostname)
if err != nil {
return nil, err
}
externalAlternateAddresses = make(map[string]string, len(poolServices.NodesExt))
for _, node := range poolServices.NodesExt {

if _, ok := connSpecAddressesHostMap[node.Hostname]; ok {
// Found default hostname in connSpec - abort all alternate address behaviour.
// The client MUST use the default/internal network.
externalAlternateAddresses = nil
break
}

// only try to map external alternate addresses if a hostname is present
if external, ok := node.AlternateNames["external"]; ok && external.Hostname != "" {
var port string
if bucketSpecTLS {
if extPort, ok := external.Ports["kvSSL"]; ok {
port = ":" + strconv.Itoa(extPort)
extPort, ok := external.Ports["kvSSL"]
if !ok {
TracefCtx(loggingCtx, KeyDCP, "kvSSL port was not exposed for external alternate address. Don't remap this node.")
continue
}

// found exposed kvSSL port, use when connecting
port = ":" + strconv.Itoa(extPort)
DebugfCtx(loggingCtx, KeyDCP, "Storing alternate address for kvSSL: %s => %s", MD(node.Hostname), MD(external.Hostname+port))
} else {
if extPort, ok := external.Ports["kv"]; ok {
port = ":" + strconv.Itoa(extPort)
extPort, ok := external.Ports["kv"]
if !ok {
TracefCtx(loggingCtx, KeyDCP, "kv port was not exposed for external alternate address. Skipping remapping of this node.")
continue
}

// found exposed kv port, use when connecting
port = ":" + strconv.Itoa(extPort)
DebugfCtx(loggingCtx, KeyDCP, "Storing alternate address for kv: %s => %s", MD(node.Hostname), MD(external.Hostname+port))
}

Tracef(KeyDCP, "Alternate address mapping %s => %s", MD(nodeHostname), MD(external.Hostname+port))
externalAlternateAddresses[nodeHostname] = external.Hostname + port
externalAlternateAddresses[node.Hostname] = external.Hostname + port
}
}

pool, err := client.GetPool(poolName)
if err != nil {
return nil, err
}

bucket, err := pool.GetBucket(bucketName)
if err != nil {
return nil, err
Expand All @@ -653,9 +680,9 @@ func alternateAddressShims(bucketSpecTLS bool) (

// Copy of cbdatasource's default Connect function, which swaps the given destination, with external addresses we found in ConnectBucket.
connectShim = func(protocol, dest string) (client *memcached.Client, err error) {
Tracef(KeyDCP, "Connect callback: %s %s", protocol, MD(dest))
TracefCtx(loggingCtx, KeyDCP, "Connect callback: %s %s", protocol, MD(dest))

dest, err = getExternalAlternateAddress(externalAlternateAddresses, dest)
dest, err = getExternalAlternateAddress(loggingCtx, externalAlternateAddresses, dest)
if err != nil {
return nil, err
}
Expand All @@ -665,9 +692,9 @@ func alternateAddressShims(bucketSpecTLS bool) (

// Copy of cbdatasource's default ConnectTLS function, which swaps the given destination, with external addresses we found in ConnectBucket.
connectTLSShim = func(protocol, dest string, tlsConfig *tls.Config) (client *memcached.Client, err error) {
Tracef(KeyDCP, "ConnectTLS callback: %s %s", protocol, MD(dest))
TracefCtx(loggingCtx, KeyDCP, "ConnectTLS callback: %s %s", protocol, MD(dest))

dest, err = getExternalAlternateAddress(externalAlternateAddresses, dest)
dest, err = getExternalAlternateAddress(loggingCtx, externalAlternateAddresses, dest)
if err != nil {
return nil, err
}
Expand Down
57 changes: 57 additions & 0 deletions base/dcp_common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package base

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGetExternalAlternateAddress(t *testing.T) {
tests := []struct {
name string
dest string
altAddrMap map[string]string
expectedDest string
}{
{
name: "no alts",
dest: "node1.cbs.example.org:1234",
expectedDest: "node1.cbs.example.org:1234",
},
{
name: "non-matching alt",
dest: "node1.cbs.example.org:1234",
altAddrMap: map[string]string{
"node9.cbs.example.org": "10.10.10.9:5678",
},
expectedDest: "node1.cbs.example.org:1234",
},
{
name: "matching alt, alt connect URL",
dest: "node1.cbs.example.org:1234",
altAddrMap: map[string]string{
"node1.cbs.example.org": "10.10.10.1:5678",
},
expectedDest: "10.10.10.1:5678",
},
{
name: "matching alt multiple",
dest: "node2.cbs.example.org:1234",
altAddrMap: map[string]string{
"node1.cbs.example.org": "10.10.10.1:5678",
"node2.cbs.example.org": "10.10.10.2:5678",
"node3.cbs.example.org": "10.10.10.3:5678",
},
expectedDest: "10.10.10.2:5678",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
newDest, err := getExternalAlternateAddress(nil, test.altAddrMap, test.dest)
require.NoError(t, err)
assert.Equal(t, test.expectedDest, newDest)
})
}
}
2 changes: 1 addition & 1 deletion base/dcp_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func StartCbgtCbdatasourceFeed(bucket Bucket, spec BucketSpec, args sgbucket.Fee
indexName := feedName

// cbdatasource expects server URL in http format
serverURLs, errConvertServerSpec := CouchbaseURIToHttpURL(bucket, spec.Server)
serverURLs, errConvertServerSpec := CouchbaseURIToHttpURL(bucket, spec.Server, nil)
if errConvertServerSpec != nil {
return errConvertServerSpec
}
Expand Down
15 changes: 10 additions & 5 deletions base/dcp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"errors"
"expvar"

"github.com/couchbase/go-couchbase"
"github.com/couchbase/go-couchbase/cbdatasource"
"github.com/couchbase/gomemcached"
sgbucket "github.com/couchbase/sg-bucket"
pkgerrors "github.com/pkg/errors"

"github.com/couchbase/go-couchbase"
"github.com/couchbase/go-couchbase/cbdatasource"
"gopkg.in/couchbaselabs/gocbconnstr.v1"
)

// Memcached binary protocol datatype bit flags (https://github.com/couchbase/memcached/blob/master/docs/BinaryProtocol.md#data-types),
Expand Down Expand Up @@ -211,9 +211,14 @@ func (nph NoPasswordAuthHandler) GetCredentials() (username string, password str
// bucket is using, and it uses the go-couchbase cbdatasource DCP abstraction layer
func StartDCPFeed(bucket Bucket, spec BucketSpec, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error {

connSpec, err := gocbconnstr.Parse(spec.Server)
if err != nil {
return err
}

// Recommended usage of cbdatasource is to let it manage it's own dedicated connection, so we're not
// reusing the bucket connection we've already established.
urls, errConvertServerSpec := CouchbaseURIToHttpURL(bucket, spec.Server)
urls, errConvertServerSpec := CouchbaseURIToHttpURL(bucket, spec.Server, &connSpec)
if errConvertServerSpec != nil {
return errConvertServerSpec
}
Expand Down Expand Up @@ -293,7 +298,7 @@ func StartDCPFeed(bucket Bucket, spec BucketSpec, args sgbucket.FeedArguments, c
}

// A lookup of host dest to external alternate address hostnames
dataSourceOptions.ConnectBucket, dataSourceOptions.Connect, dataSourceOptions.ConnectTLS = alternateAddressShims(spec.IsTLS())
dataSourceOptions.ConnectBucket, dataSourceOptions.Connect, dataSourceOptions.ConnectTLS = alternateAddressShims(loggingCtx, spec.IsTLS(), connSpec.Addresses)

DebugfCtx(loggingCtx, KeyDCP, "Connecting to new bucket datasource. URLs:%s, pool:%s, bucket:%s", MD(urls), MD(poolName), MD(bucketName))

Expand Down
28 changes: 17 additions & 11 deletions base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/couchbase/go-couchbase"
"github.com/couchbase/go-couchbase/cbdatasource"
"github.com/pkg/errors"
"gopkg.in/couchbaselabs/gocbconnstr.v1"
)

const CBGTIndexTypeSyncGatewayImport = "syncGateway-import-"
Expand Down Expand Up @@ -76,7 +77,7 @@ func cbgtFeedParams(spec BucketSpec) (string, error) {
// TODO: If the index definition already exists in the cfg, it appears like this step can be bypassed,
// as we don't currently have a scenario where we want to update an existing index def. Needs
// additional testing to validate.
func createCBGTIndex(manager *cbgt.Manager, dbName string, bucket Bucket, spec BucketSpec, numPartitions uint16) error {
func createCBGTIndex(c *CbgtContext, dbName string, bucket Bucket, spec BucketSpec, numPartitions uint16) error {

// sourceType is based on cbgt.source_gocouchbase non-public constant.
// TODO: Request that cbgt make this and source_gocb public.
Expand Down Expand Up @@ -115,8 +116,8 @@ func createCBGTIndex(manager *cbgt.Manager, dbName string, bucket Bucket, spec B
// TODO: If this isn't well-formed JSON, cbgt emits errors when opening locally persisted pindex files. Review
// how this can be optimized if we're not actually using it in the indexImpl
indexParams := `{"name": "` + dbName + `"}`

indexName := dbName + "_import"
InfofCtx(c.Cfg.loggingCtx, KeyDCP, "Creating cbgt index %q for db %q", indexName, MD(dbName))

// Required for initial pools request, before BucketDataSourceOptions kick in
if spec.Certpath != "" {
Expand All @@ -126,20 +127,25 @@ func createCBGTIndex(manager *cbgt.Manager, dbName string, bucket Bucket, spec B
couchbase.SetSkipVerify(false)
}

connSpec, err := gocbconnstr.Parse(spec.Server)
if err != nil {
return err
}

// Register bucketDataSource callback for new index if we need to configure TLS
cbgt.RegisterBucketDataSourceOptionsCallback(indexName, manager.UUID(), func(options *cbdatasource.BucketDataSourceOptions) *cbdatasource.BucketDataSourceOptions {
cbgt.RegisterBucketDataSourceOptionsCallback(indexName, c.Manager.UUID(), func(options *cbdatasource.BucketDataSourceOptions) *cbdatasource.BucketDataSourceOptions {
if spec.IsTLS() {
options.TLSConfig = func() *tls.Config {
return spec.TLSConfig()
}
}
options.ConnectBucket, options.Connect, options.ConnectTLS = alternateAddressShims(spec.IsTLS())
options.ConnectBucket, options.Connect, options.ConnectTLS = alternateAddressShims(c.Cfg.loggingCtx, spec.IsTLS(), connSpec.Addresses)
return options
})

_, previousIndexUUID, err := getCBGTIndexUUID(manager, indexName)
_, previousIndexUUID, err := getCBGTIndexUUID(c.Manager, indexName)
indexType := CBGTIndexTypeSyncGatewayImport + dbName
err = manager.CreateIndex(
err = c.Manager.CreateIndex(
sourceType, // sourceType
bucket.GetName(), // sourceName
bucketUUID, // sourceUUID
Expand All @@ -150,9 +156,9 @@ func createCBGTIndex(manager *cbgt.Manager, dbName string, bucket Bucket, spec B
planParams, // planParams
previousIndexUUID, // prevIndexUUID
)
manager.Kick("NewIndexesCreated")
c.Manager.Kick("NewIndexesCreated")

Infof(KeyDCP, "Initialized sharded DCP feed %s with %d partitions.", indexName, numPartitions)
InfofCtx(c.Cfg.loggingCtx, KeyDCP, "Initialized sharded DCP feed %s with %d partitions.", indexName, numPartitions)
return err

}
Expand Down Expand Up @@ -233,7 +239,7 @@ func initCBGTManager(dbName string, bucket Bucket, spec BucketSpec) (*CbgtContex
var serverURL string
if feedType == cbgtFeedType_cbdatasource {
// cbdatasource expects server URL in http format
serverURLs, errConvertServerSpec := CouchbaseURIToHttpURL(bucket, spec.Server)
serverURLs, errConvertServerSpec := CouchbaseURIToHttpURL(bucket, spec.Server, nil)
if errConvertServerSpec != nil {
return nil, errConvertServerSpec
}
Expand Down Expand Up @@ -293,7 +299,7 @@ func (c *CbgtContext) StartManager(dbName string, bucket Bucket, spec BucketSpec
}

// Add the index definition for this feed to the cbgt cfg, in case it's not already present.
err = createCBGTIndex(c.Manager, dbName, bucket, spec, numPartitions)
err = createCBGTIndex(c, dbName, bucket, spec, numPartitions)
if err != nil {
Errorf("cbgt index creation failed: %v", err)
return err
Expand All @@ -309,7 +315,7 @@ func initCfgCB(bucket Bucket, spec BucketSpec) (*cbgt.CfgCB, error) {
options := map[string]interface{}{
"keyPrefix": SyncPrefix,
}
urls, errConvertServerSpec := CouchbaseURIToHttpURL(bucket, spec.Server)
urls, errConvertServerSpec := CouchbaseURIToHttpURL(bucket, spec.Server, nil)
if errConvertServerSpec != nil {
return nil, errConvertServerSpec
}
Expand Down
24 changes: 17 additions & 7 deletions base/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,10 +704,11 @@ func BoolPtr(b bool) *bool {
return &b
}

// Convert a Couchbase URI (eg, couchbase://host1,host2) to a list of HTTP URLs with ports (eg, ["http://host1:8091", "http://host2:8091"])
// Convert a Bucket, or a Couchbase URI (eg, couchbase://host1,host2) to a list of HTTP URLs with ports (eg, ["http://host1:8091", "http://host2:8091"])
// connSpec can be optionally passed in if available, to prevent unnecessary double-parsing of connstr
// Primary use case is for backwards compatibility with go-couchbase, cbdatasource, and CBGT. Supports secure URI's as well (couchbases://).
// Related CBGT ticket: https://issues.couchbase.com/browse/MB-25522
func CouchbaseURIToHttpURL(bucket Bucket, couchbaseUri string) (httpUrls []string, err error) {
func CouchbaseURIToHttpURL(bucket Bucket, couchbaseUri string, connSpec *gocbconnstr.ConnSpec) (httpUrls []string, err error) {

// If we're using a gocb bucket, use the bucket to retrieve the mgmt endpoints. Note that incoming bucket may be CouchbaseBucketGoCB or *CouchbaseBucketGoCB.
switch typedBucket := bucket.(type) {
Expand All @@ -728,12 +729,21 @@ func CouchbaseURIToHttpURL(bucket Bucket, couchbaseUri string) (httpUrls []strin
return []string{singleHttpUrl}, nil
}

// Unable to do simple URL parse, try to parse into components w/ gocbconnstr
connSpec, errParse := gocbconnstr.Parse(couchbaseUri)
if errParse != nil {
return httpUrls, pkgerrors.WithStack(RedactErrorf("Error parsing gocb connection string: %v. Error: %v", MD(couchbaseUri), errParse))
// Parse the given URI if we've not already got a connSpec
if connSpec == nil {
// Unable to do simple URL parse, try to parse into components w/ gocbconnstr
newConnSpec, errParse := gocbconnstr.Parse(couchbaseUri)
if errParse != nil {
return httpUrls, pkgerrors.WithStack(RedactErrorf("Error parsing gocb connection string: %v. Error: %v", MD(couchbaseUri), errParse))
}
connSpec = &newConnSpec
}

return connSpecToHTTPURLs(*connSpec)
}

func connSpecToHTTPURLs(connSpec gocbconnstr.ConnSpec) (httpUrls []string, err error) {

for _, address := range connSpec.Addresses {

// Determine port to use for management API
Expand All @@ -745,7 +755,7 @@ func CouchbaseURIToHttpURL(bucket Bucket, couchbaseUri string) (httpUrls []strin
case "couchbase":
fallthrough
case "couchbases":
return nil, RedactErrorf("couchbase:// and couchbases:// URI schemes can only be used with GoCB buckets. Bucket: %+v", MD(bucket))
return nil, RedactErrorf("couchbase:// and couchbases:// URI schemes can only be used with GoCB buckets.")
case "https":
translatedScheme = "https"
}
Expand Down
Loading

0 comments on commit a08bf70

Please sign in to comment.