diff --git a/secondary/queryport/client/client.go b/secondary/queryport/client/client.go index 0fc40cc5e..858f6e19f 100644 --- a/secondary/queryport/client/client.go +++ b/secondary/queryport/client/client.go @@ -398,6 +398,7 @@ type GsiClient struct { scanResponse int64 dataEncFmt uint32 qcLock sync.Mutex + needsAuth *uint32 } // NewGsiClient returns client to access GSI cluster. @@ -1461,7 +1462,7 @@ func (c *GsiClient) updateScanClients() { if qc.IsClosed() { logging.Infof("Found a closed scanclient for %v. Initializing a new scan client.", queryport) - if qc, err := NewGsiScanClient(queryport, c.cluster, c.config); err == nil { + if qc, err := NewGsiScanClient(queryport, c.cluster, c.config, c.needsAuth); err == nil { clients[queryport] = qc } else { logging.Errorf("Unable to initialize gsi scanclient (%v)", err) @@ -1475,7 +1476,7 @@ func (c *GsiClient) updateScanClients() { } for queryport := range newclients { - if qc, err := NewGsiScanClient(queryport, c.cluster, c.config); err == nil { + if qc, err := NewGsiScanClient(queryport, c.cluster, c.config, c.needsAuth); err == nil { clients[queryport] = qc } else { logging.Errorf("Unable to initialize gsi scanclient (%v)", err) @@ -1752,9 +1753,12 @@ func (c *GsiClient) getBucketHash(bucketn string) (uint64, bool) { func makeWithCbq(cluster string, config common.Config, encryptLocalHost bool) (*GsiClient, error) { var err error + var needsAuth uint32 + c := &GsiClient{ - cluster: cluster, - config: config, + cluster: cluster, + config: config, + needsAuth: &needsAuth, } if err := c.initSecurityContext(encryptLocalHost); err != nil { @@ -1769,7 +1773,7 @@ func makeWithCbq(cluster string, config common.Config, encryptLocalHost bool) (* } clients := make(map[string]*GsiScanClient) for _, queryport := range c.bridge.GetScanports() { - if qc, err := NewGsiScanClient(queryport, c.cluster, config); err == nil { + if qc, err := NewGsiScanClient(queryport, c.cluster, config, c.needsAuth); err == nil { clients[queryport] = qc } } @@ -1786,6 +1790,8 @@ func makeWithMetaProvider( return nil, err } + var needsAuth uint32 + c = &GsiClient{ cluster: cluster, config: config, @@ -1793,6 +1799,7 @@ func makeWithMetaProvider( metaCh: make(chan bool, 1), settings: NewClientSettings(needRefresh), killch: make(chan bool, 1), + needsAuth: &needsAuth, } if err := c.initSecurityContext(encryptLocalHost); err != nil { diff --git a/secondary/queryport/client/conn_pool.go b/secondary/queryport/client/conn_pool.go index 734b1916f..efc085cca 100644 --- a/secondary/queryport/client/conn_pool.go +++ b/secondary/queryport/client/conn_pool.go @@ -50,6 +50,7 @@ type connectionPool struct { kaInterval time.Duration authHost string cluster string + needsAuth *uint32 } type connection struct { @@ -68,7 +69,7 @@ func newConnectionPool( poolSize, poolOverflow, maxPayload int, timeout, availTimeout time.Duration, minPoolSizeWM int32, relConnBatchSize int32, kaInterval int, - cluster string) *connectionPool { + cluster string, needsAuth *uint32) *connectionPool { cp := &connectionPool{ host: host, @@ -83,6 +84,7 @@ func newConnectionPool( stopCh: make(chan bool, 1), kaInterval: time.Duration(kaInterval) * time.Second, cluster: cluster, + needsAuth: needsAuth, } // Ignore the error in initHostportForAuth, if any. @@ -138,9 +140,12 @@ func (cp *connectionPool) defaultMkConn(host string) (*connection, error) { func (cp *connectionPool) doAuth(conn *connection) error { // Check if auth is supported / configured before doing auth - if common.GetClusterVersion() < common.INDEXER_71_VERSION { - logging.Verbosef("%v doAuth Auth is not needed for connection (%v,%v)", - cp.logPrefix, conn.conn.LocalAddr(), conn.conn.RemoteAddr()) + clustVer := common.GetClusterVersion() + needsAuth := atomic.LoadUint32(cp.needsAuth) + + if clustVer < common.INDEXER_71_VERSION && needsAuth == 0 { + logging.Verbosef("%v doAuth Auth is not needed for connection (%v,%v) clustVer %v, needsAuth %v ", + cp.logPrefix, conn.conn.LocalAddr(), conn.conn.RemoteAddr(), clustVer, needsAuth) return nil } diff --git a/secondary/queryport/client/conn_pool_test.go b/secondary/queryport/client/conn_pool_test.go index c46d99c7a..e31880370 100644 --- a/secondary/queryport/client/conn_pool_test.go +++ b/secondary/queryport/client/conn_pool_test.go @@ -63,7 +63,8 @@ func TestConnPoolBasicSanity(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 3, 6, 1024*1024, readDeadline, writeDeadline, 3, 1, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 3, 6, 1024*1024, readDeadline, writeDeadline, 3, 1, 1, "", &needsAuth) cp.mkConn = testMkConn seenClients := map[*connection]bool{} @@ -106,7 +107,8 @@ func TestConnRelease(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "", &needsAuth) cp.mkConn = testMkConn seenClients := map[*connection]bool{} @@ -179,7 +181,8 @@ func TestLongevity(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "", &needsAuth) cp.mkConn = testMkConn // Get 240 Connections. @@ -302,7 +305,8 @@ func TestSustainedHighConns(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "", &needsAuth) cp.mkConn = testMkConn ch := make(chan *connection, 1000) @@ -345,7 +349,8 @@ func TestLowWM(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 20, 5, 1024*1024, readDeadline, writeDeadline, 10, 2, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 20, 5, 1024*1024, readDeadline, writeDeadline, 10, 2, 1, "", &needsAuth) cp.mkConn = testMkConn seenClients := map[*connection]bool{} @@ -400,7 +405,8 @@ func TestTotalConns(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 120, 5, 1024*1024, readDeadline, writeDeadline, 10, 10, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 120, 5, 1024*1024, readDeadline, writeDeadline, 10, 10, 1, "", &needsAuth) cp.mkConn = testMkConn seenClients := map[*connection]bool{} @@ -484,7 +490,8 @@ func TestUpdateTickRate(t *testing.T) { go ts.initServer(host, tsStopCh) time.Sleep(1 * time.Second) - cp := newConnectionPool(host, 40, 5, 1024*1024, readDeadline, writeDeadline, 2, 2, 1, "") + var needsAuth uint32 + cp := newConnectionPool(host, 40, 5, 1024*1024, readDeadline, writeDeadline, 2, 2, 1, "", &needsAuth) cp.mkConn = testMkConn seenClients := map[*connection]bool{} diff --git a/secondary/queryport/client/scan_client.go b/secondary/queryport/client/scan_client.go index 81df6d453..6fc8a0805 100644 --- a/secondary/queryport/client/scan_client.go +++ b/secondary/queryport/client/scan_client.go @@ -42,9 +42,10 @@ type GsiScanClient struct { serverVersion uint32 closed uint32 + needsAuth *uint32 } -func NewGsiScanClient(queryport, cluster string, config common.Config) (*GsiScanClient, error) { +func NewGsiScanClient(queryport, cluster string, config common.Config, needsAuth *uint32) (*GsiScanClient, error) { t := time.Duration(config["connPoolAvailWaitTimeout"].Int()) c := &GsiScanClient{ queryport: queryport, @@ -58,11 +59,12 @@ func NewGsiScanClient(queryport, cluster string, config common.Config) (*GsiScan logPrefix: fmt.Sprintf("[GsiScanClient:%q]", queryport), minPoolSizeWM: int32(config["settings.minPoolSizeWM"].Int()), relConnBatchSize: int32(config["settings.relConnBatchSize"].Int()), + needsAuth: needsAuth, } c.pool = newConnectionPool( queryport, c.poolSize, c.poolOverflow, c.maxPayload, c.cpTimeout, c.cpAvailWaitTimeout, c.minPoolSizeWM, c.relConnBatchSize, config["keepAliveInterval"].Int(), - cluster) + cluster, needsAuth) logging.Infof("%v started ...\n", c.logPrefix) if version, err := c.Helo(); err == nil || err == io.EOF { @@ -1311,11 +1313,12 @@ REQUEST_RESPONSE_RETRY: // the upgraded cluster version. From this point onwards, the client will // start using auth for all new connections. + atomic.StoreUint32(c.needsAuth, uint32(1)) + if rsp.GetCode() == transport.AUTH_MISSING && !authRetry { // Do not count this as a "retry" logging.Infof("%v server needs authentication information. Retrying "+ "request with auth req(%v)", c.logPrefix, requestId) - // TODO: Update cluster version renew() authRetry = true goto REQUEST_RESPONSE_RETRY @@ -1400,6 +1403,8 @@ func (c *GsiScanClient) streamResponse( // When the cluster upgrade completes and the queryport starts supporting // auth. See doRequestResponse for more details. + atomic.StoreUint32(c.needsAuth, uint32(1)) + if rsp.GetCode() == transport.AUTH_MISSING { // Do not count this as a "retry" logging.Infof("%v server needs authentication information. Retrying "+ diff --git a/secondary/tests/functionaltests/set14_rebalance_test.go b/secondary/tests/functionaltests/set14_rebalance_test.go index 01c5a54bb..809052979 100644 --- a/secondary/tests/functionaltests/set14_rebalance_test.go +++ b/secondary/tests/functionaltests/set14_rebalance_test.go @@ -207,6 +207,7 @@ func TestCreateIndexesBeforeRebalance(t *testing.T) { // Ending config: [0: kv n1ql] [1: index] [2: index] [3: index] func TestIndexNodeRebalanceIn(t *testing.T) { addTwoNodesAndRebalance("TestIndexNodeRebalanceIn", t) + waitForRebalanceCleanup() } // addTwoNodesAndRebalance is the delegate of two tests that perform the same actions at different points @@ -318,6 +319,7 @@ func TestIndexNodeRebalanceOut(t *testing.T) { } printClusterConfig(method, "exit") + waitForRebalanceCleanup() } // TestFailoverAndRebalance fails over node [2: index] from the cluster and rebalances. @@ -344,6 +346,7 @@ func TestFailoverAndRebalance(t *testing.T) { } printClusterConfig(method, "exit") + waitForRebalanceCleanup() } // TestSwapRebalance adds node [1: index] to the cluster (without rebalancing), then rebalances out @@ -372,6 +375,7 @@ func TestSwapRebalance(t *testing.T) { } printClusterConfig(method, "exit") + waitForRebalanceCleanup() } // TestRebalanceReplicaRepair adds nodes [2: index] and [3: index], then rebalances. The actions performed are @@ -381,6 +385,7 @@ func TestSwapRebalance(t *testing.T) { // (Same as TestIndexNodeRebalanceIn.) func TestRebalanceReplicaRepair(t *testing.T) { addTwoNodesAndRebalance("TestRebalanceReplicaRepair", t) + waitForRebalanceCleanup() } // TestRebalanceResetCluster restores indexer.settings.rebalance.redistribute_indexes = false @@ -407,8 +412,15 @@ func TestRebalanceResetCluster(t *testing.T) { t.Fatalf("%v: Unexpected cluster configuration: %v", method, status) } printClusterConfig(method, "exit") + waitForRebalanceCleanup() } -// NOTE: Make sure that the last test in this file resets the cluster configuration -// by calling setupCluster so that it doesn't break any other tests that run after -// the tests in this file +func waitForRebalanceCleanup() { + // This time is to prevent the next test to go ahead and create + // indexes while rebalance cleanup is in progress (Rebalance cleanup + // can take upto 1 seconds on source or destination nodes after the + // master decides that rebalance is done. This is because, + // waitForIndexBuild sleeps for upto 1 second before it can read the + // closure of r.done channel) + time.Sleep(2 * time.Second) +}