Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-22.02.2022-04.25.pass.html
Change-Id: I7db92444d25ced1be28ed8ecb1bf38aee24bf347
  • Loading branch information
amithk committed Feb 22, 2022
2 parents 7dd7bb8 + f8a88e5 commit c833be6
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 22 deletions.
17 changes: 12 additions & 5 deletions secondary/queryport/client/client.go
Expand Up @@ -398,6 +398,7 @@ type GsiClient struct {
scanResponse int64
dataEncFmt uint32
qcLock sync.Mutex
needsAuth *uint32
}

// NewGsiClient returns client to access GSI cluster.
Expand Down Expand Up @@ -1461,7 +1462,7 @@ func (c *GsiClient) updateScanClients() {

if qc.IsClosed() {
logging.Infof("Found a closed scanclient for %v. Initializing a new scan client.", queryport)
if qc, err := NewGsiScanClient(queryport, c.cluster, c.config); err == nil {
if qc, err := NewGsiScanClient(queryport, c.cluster, c.config, c.needsAuth); err == nil {
clients[queryport] = qc
} else {
logging.Errorf("Unable to initialize gsi scanclient (%v)", err)
Expand All @@ -1475,7 +1476,7 @@ func (c *GsiClient) updateScanClients() {
}

for queryport := range newclients {
if qc, err := NewGsiScanClient(queryport, c.cluster, c.config); err == nil {
if qc, err := NewGsiScanClient(queryport, c.cluster, c.config, c.needsAuth); err == nil {
clients[queryport] = qc
} else {
logging.Errorf("Unable to initialize gsi scanclient (%v)", err)
Expand Down Expand Up @@ -1752,9 +1753,12 @@ func (c *GsiClient) getBucketHash(bucketn string) (uint64, bool) {
func makeWithCbq(cluster string, config common.Config, encryptLocalHost bool) (*GsiClient, error) {
var err error

var needsAuth uint32

c := &GsiClient{
cluster: cluster,
config: config,
cluster: cluster,
config: config,
needsAuth: &needsAuth,
}

if err := c.initSecurityContext(encryptLocalHost); err != nil {
Expand All @@ -1769,7 +1773,7 @@ func makeWithCbq(cluster string, config common.Config, encryptLocalHost bool) (*
}
clients := make(map[string]*GsiScanClient)
for _, queryport := range c.bridge.GetScanports() {
if qc, err := NewGsiScanClient(queryport, c.cluster, config); err == nil {
if qc, err := NewGsiScanClient(queryport, c.cluster, config, c.needsAuth); err == nil {
clients[queryport] = qc
}
}
Expand All @@ -1786,13 +1790,16 @@ func makeWithMetaProvider(
return nil, err
}

var needsAuth uint32

c = &GsiClient{
cluster: cluster,
config: config,
queryClients: unsafe.Pointer(new(map[string]*GsiScanClient)),
metaCh: make(chan bool, 1),
settings: NewClientSettings(needRefresh),
killch: make(chan bool, 1),
needsAuth: &needsAuth,
}

if err := c.initSecurityContext(encryptLocalHost); err != nil {
Expand Down
13 changes: 9 additions & 4 deletions secondary/queryport/client/conn_pool.go
Expand Up @@ -50,6 +50,7 @@ type connectionPool struct {
kaInterval time.Duration
authHost string
cluster string
needsAuth *uint32
}

type connection struct {
Expand All @@ -68,7 +69,7 @@ func newConnectionPool(
poolSize, poolOverflow, maxPayload int,
timeout, availTimeout time.Duration,
minPoolSizeWM int32, relConnBatchSize int32, kaInterval int,
cluster string) *connectionPool {
cluster string, needsAuth *uint32) *connectionPool {

cp := &connectionPool{
host: host,
Expand All @@ -83,6 +84,7 @@ func newConnectionPool(
stopCh: make(chan bool, 1),
kaInterval: time.Duration(kaInterval) * time.Second,
cluster: cluster,
needsAuth: needsAuth,
}

// Ignore the error in initHostportForAuth, if any.
Expand Down Expand Up @@ -138,9 +140,12 @@ func (cp *connectionPool) defaultMkConn(host string) (*connection, error) {
func (cp *connectionPool) doAuth(conn *connection) error {

// Check if auth is supported / configured before doing auth
if common.GetClusterVersion() < common.INDEXER_71_VERSION {
logging.Verbosef("%v doAuth Auth is not needed for connection (%v,%v)",
cp.logPrefix, conn.conn.LocalAddr(), conn.conn.RemoteAddr())
clustVer := common.GetClusterVersion()
needsAuth := atomic.LoadUint32(cp.needsAuth)

if clustVer < common.INDEXER_71_VERSION && needsAuth == 0 {
logging.Verbosef("%v doAuth Auth is not needed for connection (%v,%v) clustVer %v, needsAuth %v ",
cp.logPrefix, conn.conn.LocalAddr(), conn.conn.RemoteAddr(), clustVer, needsAuth)
return nil
}

Expand Down
21 changes: 14 additions & 7 deletions secondary/queryport/client/conn_pool_test.go
Expand Up @@ -63,7 +63,8 @@ func TestConnPoolBasicSanity(t *testing.T) {
go ts.initServer(host, tsStopCh)
time.Sleep(1 * time.Second)

cp := newConnectionPool(host, 3, 6, 1024*1024, readDeadline, writeDeadline, 3, 1, 1, "")
var needsAuth uint32
cp := newConnectionPool(host, 3, 6, 1024*1024, readDeadline, writeDeadline, 3, 1, 1, "", &needsAuth)
cp.mkConn = testMkConn

seenClients := map[*connection]bool{}
Expand Down Expand Up @@ -106,7 +107,8 @@ func TestConnRelease(t *testing.T) {
go ts.initServer(host, tsStopCh)
time.Sleep(1 * time.Second)

cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "")
var needsAuth uint32
cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "", &needsAuth)
cp.mkConn = testMkConn

seenClients := map[*connection]bool{}
Expand Down Expand Up @@ -179,7 +181,8 @@ func TestLongevity(t *testing.T) {
go ts.initServer(host, tsStopCh)
time.Sleep(1 * time.Second)

cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "")
var needsAuth uint32
cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "", &needsAuth)
cp.mkConn = testMkConn

// Get 240 Connections.
Expand Down Expand Up @@ -302,7 +305,8 @@ func TestSustainedHighConns(t *testing.T) {
go ts.initServer(host, tsStopCh)
time.Sleep(1 * time.Second)

cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "")
var needsAuth uint32
cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10, 1, "", &needsAuth)
cp.mkConn = testMkConn

ch := make(chan *connection, 1000)
Expand Down Expand Up @@ -345,7 +349,8 @@ func TestLowWM(t *testing.T) {
go ts.initServer(host, tsStopCh)
time.Sleep(1 * time.Second)

cp := newConnectionPool(host, 20, 5, 1024*1024, readDeadline, writeDeadline, 10, 2, 1, "")
var needsAuth uint32
cp := newConnectionPool(host, 20, 5, 1024*1024, readDeadline, writeDeadline, 10, 2, 1, "", &needsAuth)
cp.mkConn = testMkConn

seenClients := map[*connection]bool{}
Expand Down Expand Up @@ -400,7 +405,8 @@ func TestTotalConns(t *testing.T) {
go ts.initServer(host, tsStopCh)
time.Sleep(1 * time.Second)

cp := newConnectionPool(host, 120, 5, 1024*1024, readDeadline, writeDeadline, 10, 10, 1, "")
var needsAuth uint32
cp := newConnectionPool(host, 120, 5, 1024*1024, readDeadline, writeDeadline, 10, 10, 1, "", &needsAuth)
cp.mkConn = testMkConn

seenClients := map[*connection]bool{}
Expand Down Expand Up @@ -484,7 +490,8 @@ func TestUpdateTickRate(t *testing.T) {
go ts.initServer(host, tsStopCh)
time.Sleep(1 * time.Second)

cp := newConnectionPool(host, 40, 5, 1024*1024, readDeadline, writeDeadline, 2, 2, 1, "")
var needsAuth uint32
cp := newConnectionPool(host, 40, 5, 1024*1024, readDeadline, writeDeadline, 2, 2, 1, "", &needsAuth)
cp.mkConn = testMkConn

seenClients := map[*connection]bool{}
Expand Down
11 changes: 8 additions & 3 deletions secondary/queryport/client/scan_client.go
Expand Up @@ -42,9 +42,10 @@ type GsiScanClient struct {

serverVersion uint32
closed uint32
needsAuth *uint32
}

func NewGsiScanClient(queryport, cluster string, config common.Config) (*GsiScanClient, error) {
func NewGsiScanClient(queryport, cluster string, config common.Config, needsAuth *uint32) (*GsiScanClient, error) {
t := time.Duration(config["connPoolAvailWaitTimeout"].Int())
c := &GsiScanClient{
queryport: queryport,
Expand All @@ -58,11 +59,12 @@ func NewGsiScanClient(queryport, cluster string, config common.Config) (*GsiScan
logPrefix: fmt.Sprintf("[GsiScanClient:%q]", queryport),
minPoolSizeWM: int32(config["settings.minPoolSizeWM"].Int()),
relConnBatchSize: int32(config["settings.relConnBatchSize"].Int()),
needsAuth: needsAuth,
}
c.pool = newConnectionPool(
queryport, c.poolSize, c.poolOverflow, c.maxPayload, c.cpTimeout,
c.cpAvailWaitTimeout, c.minPoolSizeWM, c.relConnBatchSize, config["keepAliveInterval"].Int(),
cluster)
cluster, needsAuth)
logging.Infof("%v started ...\n", c.logPrefix)

if version, err := c.Helo(); err == nil || err == io.EOF {
Expand Down Expand Up @@ -1311,11 +1313,12 @@ REQUEST_RESPONSE_RETRY:
// the upgraded cluster version. From this point onwards, the client will
// start using auth for all new connections.

atomic.StoreUint32(c.needsAuth, uint32(1))

if rsp.GetCode() == transport.AUTH_MISSING && !authRetry {
// Do not count this as a "retry"
logging.Infof("%v server needs authentication information. Retrying "+
"request with auth req(%v)", c.logPrefix, requestId)
// TODO: Update cluster version
renew()
authRetry = true
goto REQUEST_RESPONSE_RETRY
Expand Down Expand Up @@ -1400,6 +1403,8 @@ func (c *GsiScanClient) streamResponse(
// When the cluster upgrade completes and the queryport starts supporting
// auth. See doRequestResponse for more details.

atomic.StoreUint32(c.needsAuth, uint32(1))

if rsp.GetCode() == transport.AUTH_MISSING {
// Do not count this as a "retry"
logging.Infof("%v server needs authentication information. Retrying "+
Expand Down
18 changes: 15 additions & 3 deletions secondary/tests/functionaltests/set14_rebalance_test.go
Expand Up @@ -207,6 +207,7 @@ func TestCreateIndexesBeforeRebalance(t *testing.T) {
// Ending config: [0: kv n1ql] [1: index] [2: index] [3: index]
func TestIndexNodeRebalanceIn(t *testing.T) {
addTwoNodesAndRebalance("TestIndexNodeRebalanceIn", t)
waitForRebalanceCleanup()
}

// addTwoNodesAndRebalance is the delegate of two tests that perform the same actions at different points
Expand Down Expand Up @@ -318,6 +319,7 @@ func TestIndexNodeRebalanceOut(t *testing.T) {
}

printClusterConfig(method, "exit")
waitForRebalanceCleanup()
}

// TestFailoverAndRebalance fails over node [2: index] from the cluster and rebalances.
Expand All @@ -344,6 +346,7 @@ func TestFailoverAndRebalance(t *testing.T) {
}

printClusterConfig(method, "exit")
waitForRebalanceCleanup()
}

// TestSwapRebalance adds node [1: index] to the cluster (without rebalancing), then rebalances out
Expand Down Expand Up @@ -372,6 +375,7 @@ func TestSwapRebalance(t *testing.T) {
}

printClusterConfig(method, "exit")
waitForRebalanceCleanup()
}

// TestRebalanceReplicaRepair adds nodes [2: index] and [3: index], then rebalances. The actions performed are
Expand All @@ -381,6 +385,7 @@ func TestSwapRebalance(t *testing.T) {
// (Same as TestIndexNodeRebalanceIn.)
func TestRebalanceReplicaRepair(t *testing.T) {
addTwoNodesAndRebalance("TestRebalanceReplicaRepair", t)
waitForRebalanceCleanup()
}

// TestRebalanceResetCluster restores indexer.settings.rebalance.redistribute_indexes = false
Expand All @@ -407,8 +412,15 @@ func TestRebalanceResetCluster(t *testing.T) {
t.Fatalf("%v: Unexpected cluster configuration: %v", method, status)
}
printClusterConfig(method, "exit")
waitForRebalanceCleanup()
}

// NOTE: Make sure that the last test in this file resets the cluster configuration
// by calling setupCluster so that it doesn't break any other tests that run after
// the tests in this file
func waitForRebalanceCleanup() {
// This time is to prevent the next test to go ahead and create
// indexes while rebalance cleanup is in progress (Rebalance cleanup
// can take upto 1 seconds on source or destination nodes after the
// master decides that rebalance is done. This is because,
// waitForIndexBuild sleeps for upto 1 second before it can read the
// closure of r.done channel)
time.Sleep(2 * time.Second)
}

0 comments on commit c833be6

Please sign in to comment.