From cf7cfd2fde3c011aa0ca96981bc636f55dd48d4e Mon Sep 17 00:00:00 2001 From: Varun Velamuri Date: Fri, 18 Feb 2022 21:02:26 +0530 Subject: [PATCH 1/2] MB-100 Sleep for 2 seconds after rebalance to allow for cleanup to finish This time is to prevent the next test to go ahead and create indexes while rebalance cleanup is in progress (Rebalance cleanup can take upto 1 seconds on source or destination nodes after the master decides that rebalance is done. This is because, waitForIndexBuild sleeps for upto 1 second before it can read the closure of r.done channel) Change-Id: Ided29d2f596da63b4860cc4bfb5ec2d215dfd668 --- .../functionaltests/set14_rebalance_test.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/secondary/tests/functionaltests/set14_rebalance_test.go b/secondary/tests/functionaltests/set14_rebalance_test.go index 01c5a54bb..809052979 100644 --- a/secondary/tests/functionaltests/set14_rebalance_test.go +++ b/secondary/tests/functionaltests/set14_rebalance_test.go @@ -207,6 +207,7 @@ func TestCreateIndexesBeforeRebalance(t *testing.T) { // Ending config: [0: kv n1ql] [1: index] [2: index] [3: index] func TestIndexNodeRebalanceIn(t *testing.T) { addTwoNodesAndRebalance("TestIndexNodeRebalanceIn", t) + waitForRebalanceCleanup() } // addTwoNodesAndRebalance is the delegate of two tests that perform the same actions at different points @@ -318,6 +319,7 @@ func TestIndexNodeRebalanceOut(t *testing.T) { } printClusterConfig(method, "exit") + waitForRebalanceCleanup() } // TestFailoverAndRebalance fails over node [2: index] from the cluster and rebalances. @@ -344,6 +346,7 @@ func TestFailoverAndRebalance(t *testing.T) { } printClusterConfig(method, "exit") + waitForRebalanceCleanup() } // TestSwapRebalance adds node [1: index] to the cluster (without rebalancing), then rebalances out @@ -372,6 +375,7 @@ func TestSwapRebalance(t *testing.T) { } printClusterConfig(method, "exit") + waitForRebalanceCleanup() } // TestRebalanceReplicaRepair adds nodes [2: index] and [3: index], then rebalances. The actions performed are @@ -381,6 +385,7 @@ func TestSwapRebalance(t *testing.T) { // (Same as TestIndexNodeRebalanceIn.) func TestRebalanceReplicaRepair(t *testing.T) { addTwoNodesAndRebalance("TestRebalanceReplicaRepair", t) + waitForRebalanceCleanup() } // TestRebalanceResetCluster restores indexer.settings.rebalance.redistribute_indexes = false @@ -407,8 +412,15 @@ func TestRebalanceResetCluster(t *testing.T) { t.Fatalf("%v: Unexpected cluster configuration: %v", method, status) } printClusterConfig(method, "exit") + waitForRebalanceCleanup() } -// NOTE: Make sure that the last test in this file resets the cluster configuration -// by calling setupCluster so that it doesn't break any other tests that run after -// the tests in this file +func waitForRebalanceCleanup() { + // This time is to prevent the next test to go ahead and create + // indexes while rebalance cleanup is in progress (Rebalance cleanup + // can take upto 1 seconds on source or destination nodes after the + // master decides that rebalance is done. This is because, + // waitForIndexBuild sleeps for upto 1 second before it can read the + // closure of r.done channel) + time.Sleep(2 * time.Second) +} From f8a88e57b2ecaa72611fa9340bc73ad71eadfd33 Mon Sep 17 00:00:00 2001 From: Amit Kulkarni Date: Mon, 21 Feb 2022 11:12:09 +0530 Subject: [PATCH 2/2] MB-51103: Force queryport client auth on AuthResponse retry Change-Id: I71ad4e19975e7fbafab66dec3957270347b27345 --- secondary/queryport/client/client.go | 17 +++++++++++----- secondary/queryport/client/conn_pool.go | 13 ++++++++---- secondary/queryport/client/conn_pool_test.go | 21 +++++++++++++------- secondary/queryport/client/scan_client.go | 11 +++++++--- 4 files changed, 43 insertions(+), 19 deletions(-) diff --git a/secondary/queryport/client/client.go b/secondary/queryport/client/client.go index 0fc40cc5e..858f6e19f 100644 --- a/secondary/queryport/client/client.go +++ b/secondary/queryport/client/client.go @@ -398,6 +398,7 @@ type GsiClient struct { scanResponse int64 dataEncFmt uint32 qcLock sync.Mutex + needsAuth *uint32 } // NewGsiClient returns client to access GSI cluster. @@ -1461,7 +1462,7 @@ func (c *GsiClient) updateScanClients() { if qc.IsClosed() { logging.Infof("Found a closed scanclient for %v. Initializing a new scan client.", queryport) - if qc, err := NewGsiScanClient(queryport, c.cluster, c.config); err == nil { + if qc, err := NewGsiScanClient(queryport, c.cluster, c.config, c.needsAuth); err == nil { clients[queryport] = qc } else { logging.Errorf("Unable to initialize gsi scanclient (%v)", err) @@ -1475,7 +1476,7 @@ func (c *GsiClient) updateScanClients() { } for queryport := range newclients { - if qc, err := NewGsiScanClient(queryport, c.cluster, c.config); err == nil { + if qc, err := NewGsiScanClient(queryport, c.cluster, c.config, c.needsAuth); err == nil { clients[queryport] = qc } else { logging.Errorf("Unable to initialize gsi scanclient (%v)", err) @@ -1752,9 +1753,12 @@ func (c *GsiClient) getBucketHash(bucketn string) (uint64, bool) { func makeWithCbq(cluster string, config common.Config, encryptLocalHost bool) (*GsiClient, error) { var err error + var needsAuth uint32 + c := &GsiClient{ - cluster: cluster, - config: config, + cluster: cluster, + config: config, + needsAuth: &needsAuth, } if err := c.initSecurityContext(encryptLocalHost); err != nil { @@ -1769,7 +1773,7 @@ func makeWithCbq(cluster string, config common.Config, encryptLocalHost bool) (* } clients := make(map[string]*GsiScanClient) for _, queryport := range c.bridge.GetScanports() { - if qc, err := NewGsiScanClient(queryport, c.cluster, config); err == nil { + if qc, err := NewGsiScanClient(queryport, c.cluster, config, c.needsAuth); err == nil { clients[queryport] = qc } } @@ -1786,6 +1790,8 @@ func makeWithMetaProvider( return nil, err } + var needsAuth uint32 + c = &GsiClient{ cluster: cluster, config: config, @@ -1793,6 +1799,7 @@ func makeWithMetaProvider( metaCh: make(chan bool, 1), settings: NewClientSettings(needRefresh), killch: make(chan bool, 1), + needsAuth: &needsAuth, } if err := c.initSecurityContext(encryptLocalHost); err != nil { diff --git a/secondary/queryport/client/conn_pool.go b/secondary/queryport/client/conn_pool.go index 734b1916f..efc085cca 100644 --- a/secondary/queryport/client/conn_pool.go +++ b/secondary/queryport/client/conn_pool.go @@ -50,6 +50,7 @@ type connectionPool struct { kaInterval time.Duration authHost string cluster string + needsAuth *uint32 } type connection struct { @@ -68,7 +69,7 @@ func newConnectionPool( poolSize, poolOverflow, maxPayload int, timeout, availTimeout time.Duration, minPoolSizeWM int32, relConnBatchSize int32, kaInterval int, - cluster string) *connectionPool { + cluster string, needsAuth *uint32) *connectionPool { cp := &connectionPool{ host: host, @@ -83,6 +84,7 @@ func newConnectionPool( stopCh: make(chan bool, 1), kaInterval: time.Duration(kaInterval) * time.Second, cluster: cluster, + needsAuth: needsAuth, } // Ignore the error in initHostportForAuth, if any. @@ -138,9 +140,12 @@ func (cp *connectionPool) defaultMkConn(host string) (*connection, error) { func (cp *connectionPool) doAuth(conn *connection) error { // Check if auth is supported / configured before doing auth - if common.GetClusterVersion() < common.INDEXER_71_VERSION { - logging.Verbosef("%v doAuth Auth is not needed for connection (%v,%v)", - cp.logPrefix, conn.conn.LocalAddr(), conn.conn.RemoteAddr()) + clustVer := common.GetClusterVersion() + needsAuth := atomic.LoadUint32(cp.needsAuth) + + if clustVer < common.INDEXER_71_VERSION && needsAuth == 0 { + logging.Verbosef("%v doAuth Auth is not needed for connection (%v,%v) clustVer %v, needsAuth %v ", + cp.logPrefix, conn.conn.LocalAddr(), conn.conn.RemoteAddr(), clustVer, needsAuth) return nil } diff --git a/secondary/queryport/client/conn_pool_test.go b/secondary/queryport/client/conn_pool_test.go index c46d99c7a..e31880370 100644 --- a/secondary/queryport/client/conn_pool_test.go +++ b/secondary/queryport/client/conn_pool_test.go @@ -63,7 +63,8 @@ func TestConnPoolBasicSanity(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 3, 6, 1024*1024, readDeadline, writeDeadline, 3, 1, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 3, 6, 1024*1024, readDeadline, writeDeadline, 3, 1, 1, "", &needsAuth) cp.mkConn = testMkConn seenClients := map[*connection]bool{} @@ -106,7 +107,8 @@ func TestConnRelease(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "", &needsAuth) cp.mkConn = testMkConn seenClients := map[*connection]bool{} @@ -179,7 +181,8 @@ func TestLongevity(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "", &needsAuth) cp.mkConn = testMkConn // Get 240 Connections. @@ -302,7 +305,8 @@ func TestSustainedHighConns(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "", &needsAuth) cp.mkConn = testMkConn ch := make(chan *connection, 1000) @@ -345,7 +349,8 @@ func TestLowWM(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 20, 5, 1024*1024, readDeadline, writeDeadline, 10, 2, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 20, 5, 1024*1024, readDeadline, writeDeadline, 10, 2, 1, "", &needsAuth) cp.mkConn = testMkConn seenClients := map[*connection]bool{} @@ -400,7 +405,8 @@ func TestTotalConns(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 120, 5, 1024*1024, readDeadline, writeDeadline, 10, 10, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 120, 5, 1024*1024, readDeadline, writeDeadline, 10, 10, 1, "", &needsAuth) cp.mkConn = testMkConn seenClients := map[*connection]bool{} @@ -484,7 +490,8 @@ func TestUpdateTickRate(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 40, 5, 1024*1024, readDeadline, writeDeadline, 2, 2, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 40, 5, 1024*1024, readDeadline, writeDeadline, 2, 2, 1, "", &needsAuth) cp.mkConn = testMkConn seenClients := map[*connection]bool{} diff --git a/secondary/queryport/client/scan_client.go b/secondary/queryport/client/scan_client.go index 81df6d453..6fc8a0805 100644 --- a/secondary/queryport/client/scan_client.go +++ b/secondary/queryport/client/scan_client.go @@ -42,9 +42,10 @@ type GsiScanClient struct { serverVersion uint32 closed uint32 + needsAuth *uint32 } -func NewGsiScanClient(queryport, cluster string, config common.Config) (*GsiScanClient, error) { +func NewGsiScanClient(queryport, cluster string, config common.Config, needsAuth *uint32) (*GsiScanClient, error) { t := time.Duration(config["connPoolAvailWaitTimeout"].Int()) c := &GsiScanClient{ queryport: queryport, @@ -58,11 +59,12 @@ func NewGsiScanClient(queryport, cluster string, config common.Config) (*GsiScan logPrefix: fmt.Sprintf("[GsiScanClient:%q]", queryport), minPoolSizeWM: int32(config["settings.minPoolSizeWM"].Int()), relConnBatchSize: int32(config["settings.relConnBatchSize"].Int()), + needsAuth: needsAuth, } c.pool = newConnectionPool( queryport, c.poolSize, c.poolOverflow, c.maxPayload, c.cpTimeout, c.cpAvailWaitTimeout, c.minPoolSizeWM, c.relConnBatchSize, config["keepAliveInterval"].Int(), - cluster) + cluster, needsAuth) logging.Infof("%v started ...\n", c.logPrefix) if version, err := c.Helo(); err == nil || err == io.EOF { @@ -1311,11 +1313,12 @@ REQUEST_RESPONSE_RETRY: // the upgraded cluster version. From this point onwards, the client will // start using auth for all new connections. + atomic.StoreUint32(c.needsAuth, uint32(1)) + if rsp.GetCode() == transport.AUTH_MISSING && !authRetry { // Do not count this as a "retry" logging.Infof("%v server needs authentication information. Retrying "+ "request with auth req(%v)", c.logPrefix, requestId) - // TODO: Update cluster version renew() authRetry = true goto REQUEST_RESPONSE_RETRY @@ -1400,6 +1403,8 @@ func (c *GsiScanClient) streamResponse( // When the cluster upgrade completes and the queryport starts supporting // auth. See doRequestResponse for more details. + atomic.StoreUint32(c.needsAuth, uint32(1)) + if rsp.GetCode() == transport.AUTH_MISSING { // Do not count this as a "retry" logging.Infof("%v server needs authentication information. Retrying "+