Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-27.01.2022-09.30.pass.html
Change-Id: Ifb1f77270b555d0ce7020b134f02243db737180f
  • Loading branch information
amithk committed Jan 27, 2022
2 parents e5ab9da + b4a9e7a commit 29df58c
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 29 deletions.
8 changes: 6 additions & 2 deletions secondary/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1667,9 +1667,8 @@ func (idx *indexer) handleConfigUpdate(msg Message) {
common.CrashOnError(err)
}

oldPtr := idx.cinfoProvider

idx.cinfoProviderLock.Lock()
oldPtr := idx.cinfoProvider
idx.cinfoProvider = cip
idx.cinfoProviderLock.Unlock()

Expand Down Expand Up @@ -1930,6 +1929,9 @@ func (idx *indexer) isAllowedEphemeral(bucket string) (bool, string, error) {
return true, "", nil
}

idx.cinfoProviderLock.RLock()
defer idx.cinfoProviderLock.RUnlock()

cVersion := idx.cinfoProvider.ClusterVersion()
if cVersion < common.INDEXER_70_VERSION {
retMsg := fmt.Sprintf("Bucket %v is Ephemeral. Standard GSI index on Ephemeral buckets"+
Expand Down Expand Up @@ -4641,7 +4643,9 @@ func (idx *indexer) sendStreamUpdateForBuildIndex(instIdList []common.IndexInstI
enableOSO := idx.config["build.enableOSO"].Bool()

bucket := GetBucketFromKeyspaceId(keyspaceId)
idx.cinfoProviderLock.RLock()
isMagmaStorage, err := idx.cinfoProvider.IsMagmaStorage(bucket)
idx.cinfoProviderLock.RUnlock()
if err != nil {
logging.Errorf("Indexer::sendStreamUpdateForBuildIndex %v %v. Unable to check bucket storage "+
"backend err %v", buildStream, keyspaceId, err)
Expand Down
46 changes: 31 additions & 15 deletions secondary/indexer/kv_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ func NewKVSender(supvCmdch MsgChannel, supvRespch MsgChannel,

}

func (k *kvSender) FetchCInfoWithLock() {
k.cinfoProviderLock.RLock()
defer k.cinfoProviderLock.RUnlock()

k.cinfoProvider.FetchWithLock()
}

//run starts the kvsender loop which listens to messages
//from it supervisor(indexer)
func (k *kvSender) run() {
Expand Down Expand Up @@ -285,7 +292,9 @@ func (k *kvSender) openMutationStream(streamId c.StreamId, keyspaceId string,
return
}

k.cinfoProviderLock.RLock()
protoInstList := convertIndexListToProto(k.config, k.cinfoProvider, indexInstList, streamId)
k.cinfoProviderLock.RUnlock()

bucket, _, _ := SplitKeyspaceId(keyspaceId)

Expand All @@ -295,7 +304,7 @@ func (k *kvSender) openMutationStream(streamId c.StreamId, keyspaceId string,
// This failure could be due to stale cluster info cache
// Force fetch cluster info cache so that the next call
// might succeed
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()
logging.Errorf("KVSender::openMutationStream %v %v Error in fetching vbuckets info %v",
streamId, bucket, err)
respCh <- &MsgError{
Expand Down Expand Up @@ -417,7 +426,7 @@ func (k *kvSender) openMutationStream(streamId c.StreamId, keyspaceId string,
// The failure could have been due to stale cluster info cache
// Force update cluster info cache on failure so that the next
// retry might succeed
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()

logging.Errorf("KVSender::openMutationStream %v %v Error from Projector %v",
streamId, keyspaceId, err)
Expand Down Expand Up @@ -447,7 +456,7 @@ func (k *kvSender) restartVbuckets(streamId c.StreamId, keyspaceId string,
bucket, _, _ := SplitKeyspaceId(keyspaceId)
addrs, err := k.getProjAddrsForVbuckets(bucket, repairVbs)
if err != nil {
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()

logging.Errorf("KVSender::restartVbuckets %v %v Error in fetching cluster info %v",
streamId, keyspaceId, err)
Expand Down Expand Up @@ -572,7 +581,7 @@ func (k *kvSender) restartVbuckets(streamId c.StreamId, keyspaceId string,
// The failure could have been due to stale cluster info cache
// Force update cluster info cache on failure so that the next
// retry might succeed
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()

respCh <- &MsgError{
err: Error{code: ERROR_KVSENDER_STREAM_REQUEST_ERROR,
Expand Down Expand Up @@ -607,7 +616,7 @@ func (k *kvSender) addIndexForExistingKeyspace(streamId c.StreamId, keyspaceId s
bucket, _, _ := SplitKeyspaceId(keyspaceId)
_, addrs, err := k.getAllVbucketsInCluster(bucket)
if err != nil {
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()

logging.Errorf("KVSender::addIndexForExistingKeyspace %v %v Error in fetching cluster info %v",
streamId, keyspaceId, err)
Expand All @@ -619,7 +628,9 @@ func (k *kvSender) addIndexForExistingKeyspace(streamId c.StreamId, keyspaceId s
}

var currentTs *protobuf.TsVbuuid
k.cinfoProviderLock.RLock()
protoInstList := convertIndexListToProto(k.config, k.cinfoProvider, indexInstList, streamId)
k.cinfoProviderLock.RUnlock()
topic := getTopicForStreamId(streamId)

fn := func(r int, err error) error {
Expand Down Expand Up @@ -670,7 +681,7 @@ func (k *kvSender) addIndexForExistingKeyspace(streamId c.StreamId, keyspaceId s
// The failure could have been due to stale cluster info cache
// Force update cluster info cache on failure so that the next
// retry might succeed
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()

logging.Errorf("KVSender::addIndexForExistingKeyspace %v %v Error from Projector %v",
streamId, keyspaceId, err)
Expand All @@ -695,7 +706,7 @@ func (k *kvSender) deleteIndexesFromStream(streamId c.StreamId, keyspaceId strin

addrs, err := k.getAllProjectorAddrs()
if err != nil {
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()

logging.Errorf("KVSender::deleteIndexesFromStream %v %v Error in fetching cluster info %v",
streamId, keyspaceId, err)
Expand Down Expand Up @@ -760,7 +771,7 @@ func (k *kvSender) deleteIndexesFromStream(streamId c.StreamId, keyspaceId strin
// The failure could have been due to stale cluster info cache
// Force update cluster info cache on failure so that the next
// retry might succeed
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()

logging.Errorf("KVSender::deleteIndexesFromStream %v %v Error from Projector %v",
streamId, keyspaceId, err)
Expand All @@ -779,7 +790,7 @@ func (k *kvSender) deleteKeyspacesFromStream(streamId c.StreamId, keyspaceIds []

addrs, err := k.getAllProjectorAddrs()
if err != nil {
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()

logging.Errorf("KVSender::deleteKeyspacesFromStream %v %v Error in fetching cluster info %v",
streamId, keyspaceIds, err)
Expand Down Expand Up @@ -837,7 +848,7 @@ func (k *kvSender) deleteKeyspacesFromStream(streamId c.StreamId, keyspaceIds []
// The failure could have been due to stale cluster info cache
// Force update cluster info cache on failure so that the next
// retry might succeed
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()

logging.Errorf("KVSender::deleteKeyspacesFromStream %v %v Error from Projector %v",
streamId, keyspaceIds, err)
Expand All @@ -856,7 +867,7 @@ func (k *kvSender) closeMutationStream(streamId c.StreamId, keyspaceId string,

addrs, err := k.getAllProjectorAddrs()
if err != nil {
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()

logging.Errorf("KVSender::closeMutationStream %v %v Error in fetching cluster info %v",
streamId, keyspaceId, err)
Expand Down Expand Up @@ -914,7 +925,7 @@ func (k *kvSender) closeMutationStream(streamId c.StreamId, keyspaceId string,
// The failure could have been due to stale cluster info cache
// Force update cluster info cache on failure so that the next
// retry might succeed
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()

logging.Errorf("KVSender::closeMutationStream, %v %v Error from Projector %v",
streamId, keyspaceId, err)
Expand Down Expand Up @@ -1309,7 +1320,7 @@ func (k *kvSender) getFailoverLogs(bucket string,

addrs, err := k.getAllProjectorAddrs()
if err != nil {
k.cinfoProvider.FetchWithLock()
k.FetchCInfoWithLock()
return nil, err
}

Expand Down Expand Up @@ -1343,7 +1354,9 @@ loop:

func (k *kvSender) getAllVbucketsInCluster(bucket string) ([]uint32, []string, error) {

k.cinfoProviderLock.RLock()
binfo, err := k.cinfoProvider.GetBucketInfoProvider(bucket)
k.cinfoProviderLock.RUnlock()
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1384,7 +1397,9 @@ func (k *kvSender) getAllVbucketsInCluster(bucket string) ([]uint32, []string, e

func (k *kvSender) getAllProjectorAddrs() ([]string, error) {

k.cinfoProviderLock.RLock()
ninfo, err := k.cinfoProvider.GetNodesInfoProvider()
k.cinfoProviderLock.RUnlock()
if err != nil {
return nil, err
}
Expand All @@ -1408,7 +1423,9 @@ func (k *kvSender) getAllProjectorAddrs() ([]string, error) {

func (k *kvSender) getProjAddrsForVbuckets(bucket string, vbnos []Vbucket) ([]string, error) {

k.cinfoProviderLock.RLock()
binfo, err := k.cinfoProvider.GetBucketInfoProvider(bucket)
k.cinfoProviderLock.RUnlock()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1475,9 +1492,8 @@ func (k *kvSender) handleConfigUpdate(cmd Message) {
}
cip.SetMaxRetries(MAX_CLUSTER_FETCH_RETRY)

oldProvider := k.cinfoProvider

k.cinfoProviderLock.Lock()
oldProvider := k.cinfoProvider
k.cinfoProvider = cip
k.cinfoProviderLock.Unlock()

Expand Down
11 changes: 7 additions & 4 deletions secondary/indexer/sched_index_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ func (m *schedIndexCreator) handleSupervisorCommands(cmd Message) {
if oldUseCInfoLite != newUseCInfoLite {
logging.Infof("schedIndexCreator:handleSupervisorCommands Updating ClusterInfoProvider in schedIndexCreator")

oldProvider := m.cinfoProvider
cip := m.getCinfoNoLock()
if cip == nil {
logging.Warnf("schedIndexCreator:handleSupervisorCommands Unable to update ClusterInfoProvider in schedIndexCreator use_cinfo_lite: old %v new %v",
Expand All @@ -277,6 +276,7 @@ func (m *schedIndexCreator) handleSupervisorCommands(cmd Message) {
// cinfoProvider is used nil check if done and if nil its fetched
// again at that point
m.cinfoProviderLock.Lock()
oldProvider := m.cinfoProvider
m.cinfoProvider = cip
m.cinfoProviderLock.Unlock()

Expand Down Expand Up @@ -735,7 +735,9 @@ func (m *schedIndexCreator) orphanTokenMover() {
continue
}

m.cinfoProviderLock.Lock()
ninfo, e := m.cinfoProvider.GetNodesInfoProvider()
m.cinfoProviderLock.Unlock()
if e != nil {
logging.Errorf("schedIndexCreator:orphanTokenMover GetNodesInfoProvider returned err: %v", e)
continue
Expand Down Expand Up @@ -845,7 +847,9 @@ func (m *schedIndexCreator) keyspaceMonitor() {

defn := schedToken.Definition

m.cinfoProviderLock.Lock()
collnInfo, e := m.cinfoProvider.GetCollectionInfoProvider(defn.Bucket)
m.cinfoProviderLock.Unlock()
if e != nil {
logging.Errorf("schedIndexCreator:keyspaceMonitor GetCollectionInfoProvider returned err: %v", e)
continue
Expand All @@ -858,10 +862,9 @@ func (m *schedIndexCreator) keyspaceMonitor() {
var ok bool
if bucketUuid, ok = buckets[defn.Bucket]; !ok {
cont := func() bool {
collnInfo.RLock()
defer collnInfo.RUnlock()

m.cinfoProviderLock.Lock()
bucketUuid, err = m.cinfoProvider.GetBucketUUID(defn.Bucket)
m.cinfoProviderLock.Unlock()
if err != nil {
logging.Errorf("schedIndexCreator:keyspaceMonitor GetBucketUUID returned err: %v", e)
return true
Expand Down
2 changes: 1 addition & 1 deletion secondary/manager/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3021,7 +3021,6 @@ func (m *LifecycleMgr) handleConfigUpdate(content []byte) (err error) {
if oldUseCInfoLite != newUseCInfoLite {
logging.Infof("LifecycleMgr.handleConfigUpdate() Updating ClusterInfoProvider in LifecycleMgr")

oldPtr := m.cinfoProvider
cip, err := common.NewClusterInfoProvider(newUseCInfoLite,
m.clusterURL, common.DEFAULT_POOL, "LifecycleMgr", *config)
if err != nil {
Expand All @@ -3031,6 +3030,7 @@ func (m *LifecycleMgr) handleConfigUpdate(content []byte) (err error) {
}

m.cinfoProviderLock.Lock()
oldPtr := m.cinfoProvider
m.cinfoProvider = cip
m.cinfoProviderLock.Unlock()

Expand Down
12 changes: 6 additions & 6 deletions secondary/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,17 +309,17 @@ func (m *IndexManager) Close() {
m.repo.Close()
}

m.cinfoProviderLock.Lock()
if m.cinfoProvider != nil {
m.cinfoProviderLock.Lock()
m.cinfoProvider.Close()
m.cinfoProviderLock.Unlock()
}
m.cinfoProviderLock.Unlock()

m.cinfoProviderLockReqHandler.Lock()
if m.cinfoProviderReqHandler != nil {
m.cinfoProviderLockReqHandler.Lock()
m.cinfoProviderReqHandler.Close()
m.cinfoProviderLockReqHandler.Unlock()
}
m.cinfoProviderLockReqHandler.Unlock()

if m.monitorKillch != nil {
close(m.monitorKillch)
Expand Down Expand Up @@ -727,7 +727,6 @@ func (m *IndexManager) NotifyConfigUpdate(config common.Config) error {

logging.Infof("IndexManager.NotifyConfigUpdate(): Updating ClusterInfoProvider in IndexManager")

oldPtr := m.cinfoProvider
var cip common.ClusterInfoProvider
cip, err = common.NewClusterInfoProvider(useCInfoLite,
m.clusterURL, common.DEFAULT_POOL, "IndexMgr", config)
Expand All @@ -738,6 +737,7 @@ func (m *IndexManager) NotifyConfigUpdate(config common.Config) error {
}

m.cinfoProviderLock.Lock()
oldPtr := m.cinfoProvider
m.cinfoProvider = cip
m.useCInfoLite = useCInfoLite
m.cinfoProviderLock.Unlock()
Expand All @@ -755,7 +755,6 @@ func (m *IndexManager) NotifyConfigUpdate(config common.Config) error {

logging.Infof("IndexManager.NotifyConfigUpdate(): Updating ClusterInfoProvider in RequestHandler")

oldPtrReqHandler := m.cinfoProviderReqHandler
var cipReqHandler common.ClusterInfoProvider
cipReqHandler, errReqHandler = common.NewClusterInfoProvider(useCInfoLite,
m.clusterURL, common.DEFAULT_POOL, "RequestHandler", config)
Expand All @@ -766,6 +765,7 @@ func (m *IndexManager) NotifyConfigUpdate(config common.Config) error {
}

m.cinfoProviderLockReqHandler.Lock()
oldPtrReqHandler := m.cinfoProviderReqHandler
m.cinfoProviderReqHandler = cipReqHandler
m.useCInfoLiteReqHandler = useCInfoLite
m.cinfoProviderLockReqHandler.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions secondary/manager/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2893,7 +2893,9 @@ func (m *requestHandlerContext) validateScheduleCreateRequest(req *client.Schedu
return "", "", "", err
}

m.mgr.cinfoProviderLockReqHandler.RLock()
bucketUUID, err = m.mgr.cinfoProviderReqHandler.GetBucketUUID(defn.Bucket)
m.mgr.cinfoProviderLockReqHandler.RUnlock()
if err != nil || bucketUUID == common.BUCKET_UUID_NIL {
return "", "", "", common.ErrBucketNotFound
}
Expand Down
2 changes: 1 addition & 1 deletion secondary/projector/projector.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ func NewProjector(maxvbs int, config c.Config, certFile, keyFile, caFile string)
if oldUseCInfoLite != newUseCInfoLite {
logging.Infof("Updating ClusterInfoProvider in projector")

oldPtr := p.cinfoProvider
cip, err := c.NewClusterInfoProvider(newUseCInfoLite, p.clusterAddr, "default",
"projector", p.config.SectionConfig("projector.", true))
if err != nil {
Expand All @@ -220,6 +219,7 @@ func NewProjector(maxvbs int, config c.Config, certFile, keyFile, caFile string)
cip.SetRetryInterval(4)

p.cinfoProviderLock.Lock()
oldPtr := p.cinfoProvider
p.cinfoProvider = cip
p.cinfoProviderLock.Unlock()

Expand Down

0 comments on commit 29df58c

Please sign in to comment.