Skip to content

Commit

Permalink
MB-46760 - remove compression validation for replication settings change
Browse files Browse the repository at this point in the history
Given that Couchbase <5.5 is now EOL and no longer supported,
all Couchbase versions now support snappy and checks are no longer
needed

Change-Id: Iedf0a366b3c4e8ae5e0fb0183073343afde9db07
Reviewed-on: http://review.couchbase.org/c/goxdcr/+/155137
Well-Formed: Restriction Checker
Reviewed-by: Lilei Chen <lilei.chen@couchbase.com>
Reviewed-by: Neil Huang <neil.huang@couchbase.com>
Tested-by: Neil Huang <neil.huang@couchbase.com>
  • Loading branch information
nelio2k committed Jun 7, 2021
1 parent 81ec2b0 commit 8d3a6a5
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 164 deletions.
106 changes: 2 additions & 104 deletions metadata_svc/replication_spec_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"sync"
"time"

mcc "github.com/couchbase/gomemcached/client"
"github.com/couchbase/goxdcr/base"
"github.com/couchbase/goxdcr/log"
"github.com/couchbase/goxdcr/metadata"
Expand Down Expand Up @@ -519,31 +518,10 @@ func (service *ReplicationSpecService) validateCompression(errorMap base.ErrorMa
if len(errorMap) > 0 || err != nil {
return err
}


thisNodeContainsKV, kvCheckErr := service.xdcr_comp_topology_svc.IsKVNode()
if kvCheckErr != nil {
service.logger.Warnf("Checking for KV capability got err: %v", kvCheckErr)
}
if kvCheckErr == nil && thisNodeContainsKV {
err = service.validateCompressionLocal(errorMap, sourceBucket, targetBucket, errKey, requestedFeaturesSet)
if len(errorMap) > 0 || err != nil {
return err
}
}

// Need to validate each node of the target to make sure all of them can support compression
for i := 0; i < len(allKvConnStrs); i++ {
err = service.validateCompressionTarget(errorMap, sourceBucket, targetClusterRef, targetBucket, allKvConnStrs[i], username, password,
httpAuthMech, certificate, SANInCertificate, clientCertificate, clientKey, errKey, requestedFeaturesSet)
if len(errorMap) > 0 || err != nil {
return err
}
}

return err
return nil
}

// Prereq compression check is cheap
func (service *ReplicationSpecService) validateCompressionPreReq(errorMap base.ErrorMap, targetBucket string, targetBucketInfo map[string]interface{}, compressionType int,
errKey string) error {
var err error
Expand Down Expand Up @@ -573,86 +551,6 @@ func (service *ReplicationSpecService) validateCompressionPreReq(errorMap base.E
return err
}

func (service *ReplicationSpecService) validateCompressionLocal(errorMap base.ErrorMap, sourceBucket string, targetBucket string, errKey string,
requestedFeaturesSet utilities.HELOFeatures) error {
localAddr, err := service.xdcr_comp_topology_svc.MyMemcachedAddr()
if err != nil {
return err
}
localClient, respondedFeatures, err := service.utils.GetMemcachedConnectionWFeatures(localAddr, sourceBucket,
base.ComposeUserAgentWithBucketNames("Goxdcr ReplSpecSvc", sourceBucket, targetBucket), base.KeepAlivePeriod, requestedFeaturesSet, service.logger)
if localClient != nil {
localClient.Close()
}
if err != nil {
return err
}
if respondedFeatures.CompressionType != requestedFeaturesSet.CompressionType {
errorMap[errKey] = fmt.Errorf("Source cluster %v does not support %v compression. Please verify the configuration on the source cluster for %v compression support, or disable compression for this replication.",
localAddr, base.CompressionTypeStrings[requestedFeaturesSet.CompressionType], base.CompressionTypeStrings[requestedFeaturesSet.CompressionType])
return err
}
return err
}

func (service *ReplicationSpecService) validateCompressionTarget(errorMap base.ErrorMap, sourceBucket string, targetClusterRef *metadata.RemoteClusterReference, targetBucket string, kvConnStr, username, password string,
httpAuthMech base.HttpAuthMech, certificate []byte, SANInCertificate bool, clientCertificate, clientKey []byte, errKey string, requestedFeaturesSet utilities.HELOFeatures) error {
var conn mcc.ClientIface
sslPortMap := make(base.SSLPortMap)
var err error

if targetClusterRef.IsFullEncryption() {
// Full encryption needs TLS connection with special inputs
var connStr string
var useExternal bool
connStr, err = targetClusterRef.MyConnectionStr()
if err != nil {
return err
}
useExternal, err = service.remote_cluster_svc.ShouldUseAlternateAddress(targetClusterRef)
if err != nil {
return err
}
sslPortMap, err = service.utils.GetMemcachedSSLPortMap(connStr, username, password, httpAuthMech, certificate, SANInCertificate, clientCertificate, clientKey, targetBucket, service.logger, useExternal)
if err != nil {
return err
}
sslPort, ok := sslPortMap[kvConnStr]
if !ok {
err = errors.New(fmt.Sprintf("Unable to populate sslPort using kvConnStr: %v sslPortMap: %v", kvConnStr, sslPortMap))
return err
}
hostName := base.GetHostName(kvConnStr)
sslConStr := base.GetHostAddr(hostName, sslPort)
conn, err = base.NewTLSConn(sslConStr, username, password, certificate, SANInCertificate, clientCertificate, clientKey, targetBucket, service.logger)
} else {
// Half-Encryption should have already been verified via SCRAM-SHA check in validateXmem...
// Unencrypted is also fine here with RawConn
conn, err = service.utils.GetMemcachedRawConn(kvConnStr, username, password, targetBucket, !targetClusterRef.IsEncryptionEnabled() /*plainAuth*/, 0 /*keepAliveeriod*/, service.logger)
}
connCloseFunc := func() {
if conn != nil {
conn.Close()
}
}
defer connCloseFunc()
if err != nil {
return err
}

respondedFeatures, err := service.utils.SendHELOWithFeatures(conn, base.ComposeUserAgentWithBucketNames("Goxdcr ReplSpecSvc", sourceBucket, targetBucket), base.HELOTimeout, base.HELOTimeout, requestedFeaturesSet, service.logger)
if err != nil {
return err
}

if respondedFeatures.CompressionType != requestedFeaturesSet.CompressionType {
errorMap[base.ToCluster] = fmt.Errorf("Target cluster (node %v) does not support %v compression. Please verify the configuration on the target cluster for %v compression support, or disable compression for this replication.",
kvConnStr, base.CompressionTypeStrings[requestedFeaturesSet.CompressionType], base.CompressionTypeStrings[requestedFeaturesSet.CompressionType])
return err
}
return err
}

//validate target bucket
func (service *ReplicationSpecService) validateTargetBucket(errorMap base.ErrorMap, remote_connStr, targetBucket, remote_userName, remote_password string, httpAuthMech base.HttpAuthMech, certificate []byte, sanInCertificate bool, clientCertificate, clientKey []byte,
sourceBucket string, targetCluster string, useExternal bool) (targetBucketInfo map[string]interface{}, targetBucketUUID string, targetBucketNumberOfVBs int, targetConflictResolutionType string, targetKVVBMap map[string][]uint16) {
Expand Down
60 changes: 0 additions & 60 deletions metadata_svc/replication_spec_svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,66 +391,6 @@ func TestCompressionNegNotEnterprise(t *testing.T) {
fmt.Println("============== Test case end: TestCompressionNegNotEnterprise =================")
}

func TestCompressionNegCAPI(t *testing.T) {
assert := assert.New(t)
fmt.Println("============== Test case start: TestCompressionNegCAPI =================")
xdcrTopologyMock, metadataSvcMock, uiLogSvcMock, remoteClusterMock,
clusterInfoSvcMock, utilitiesMock, replSpecSvc,
sourceBucket, targetBucket, targetCluster, settings, clientMock := setupBoilerPlate()

// Begin mocks
setupMocks(base.ConflictResolutionType_Seqno, base.ConflictResolutionType_Seqno,
xdcrTopologyMock, metadataSvcMock, uiLogSvcMock, remoteClusterMock,
clusterInfoSvcMock, utilitiesMock, replSpecSvc, clientMock, false, /*Enterprise*/
false /*IsElastic*/, false /*CompressionPass*/)

// Turning on should be disallowed
settings[metadata.CompressionTypeKey] = base.CompressionTypeSnappy
settings[metadata.ReplicationTypeKey] = metadata.ReplicationTypeCapi
_, _, _, errMap, _, _ := replSpecSvc.ValidateNewReplicationSpec(sourceBucket, targetCluster, targetBucket, settings)
assert.NotEqual(len(errMap), 0)

fmt.Println("============== Test case end: TestCompressionNegCAPI =================")
}

func TestCompressionNegNoSnappy(t *testing.T) {
assert := assert.New(t)
fmt.Println("============== Test case start: TestCompressionNegNoSnappy =================")
xdcrTopologyMock, metadataSvcMock, uiLogSvcMock, remoteClusterMock,
clusterInfoSvcMock, utilitiesMock, replSpecSvc,
sourceBucket, targetBucket, targetCluster, settings, clientMock := setupBoilerPlate()

// Begin mocks
setupMocks(base.ConflictResolutionType_Seqno, base.ConflictResolutionType_Seqno,
xdcrTopologyMock, metadataSvcMock, uiLogSvcMock, remoteClusterMock,
clusterInfoSvcMock, utilitiesMock, replSpecSvc, clientMock, true, /*IsEnterprise*/
false /*IsElastic*/, false /*CompressionPass*/)

// Turning off should be allowed
settings[metadata.CompressionTypeKey] = base.CompressionTypeNone
_, _, _, errMap, _, _ := replSpecSvc.ValidateNewReplicationSpec(sourceBucket, targetCluster, targetBucket, settings)
assert.Equal(len(errMap), 0)

// Turning on Snappy should result in error since it is not a valid input anymore
settings[metadata.CompressionTypeKey] = base.CompressionTypeSnappy
_, _, _, errMap, _, _ = replSpecSvc.ValidateNewReplicationSpec(sourceBucket, targetCluster, targetBucket, settings)
assert.NotEqual(len(errMap), 0)

// Setting to Auto should result in warning only, and no error
settings[metadata.CompressionTypeKey] = base.CompressionTypeAuto
_, _, _, errMap, _, warnings := replSpecSvc.ValidateNewReplicationSpec(sourceBucket, targetCluster, targetBucket, settings)
assert.Equal(len(errMap), 0)
assert.NotEqual(len(warnings), 0)

// Setting path should be allowed as well
settings[metadata.CompressionTypeKey] = base.CompressionTypeAuto
errMap, err := replSpecSvc.ValidateReplicationSettings(sourceBucket, targetCluster, targetBucket, settings)
errExists := len(errMap) > 0 || err != nil
assert.False(errExists)

fmt.Println("============== Test case end: TestCompressionNegNoSnappy =================")
}

func TestElasticSearch(t *testing.T) {
assert := assert.New(t)
fmt.Println("============== Test case start: TestElasticSearch =================")
Expand Down

0 comments on commit 8d3a6a5

Please sign in to comment.