Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-12.10.2022-02.57.pass.html
Change-Id: I8b6a7a9b59fbeba3b37cfb9bf0c2313389d7df28
  • Loading branch information
amithk committed Oct 12, 2022
2 parents 578b600 + f197444 commit cca8d6d
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 41 deletions.
14 changes: 14 additions & 0 deletions secondary/manager/client/metadata_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,20 @@ var VALID_PARAM_NAMES = []string{"nodes", "defer_build", "retain_deleted_xattr",

var ErrWaitScheduleTimeout = fmt.Errorf("Timeout in checking for schedule create token.")

func (im *IndexMetadata) Clone() *IndexMetadata {
return &IndexMetadata{
Definition: im.Definition,
Instances: im.Instances,
InstsInRebalance: im.InstsInRebalance,
Stats: im.Stats,
// Immutable fields
State: im.State,
Error: im.Error,
Scheduled: im.Scheduled,
ScheduleFailed: im.ScheduleFailed,
}
}

///////////////////////////////////////////////////////
// Public function : MetadataProvider
///////////////////////////////////////////////////////
Expand Down
81 changes: 40 additions & 41 deletions secondary/queryport/client/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type metadataClient struct {
indexers unsafe.Pointer // *indexTopology

// comboIndexeCache is a ptr to a cached object containing a comboIndexes slice that holds both
// 1. Scheduled indexes from schedTokenMonitor.scheduledIndexes (via getIndexesCached)
// 1. Scheduled indexes from schedTokenMonitor.scheduledIndexes (via getSchedIndexes)
// 2. Existing indexes from indexers.allIndexes
// and a metaVersion field saving the corresponding indexers.version corresponding to #2.
// It is set to nil when a cache invalidating change occurs. Both the invalidating change and
Expand Down Expand Up @@ -226,7 +226,7 @@ func (b *metadataClient) Refresh() (indexes []*mclient.IndexMetadata, metaVersio
// comboIndexCacheMut here for comboIndexesCache coherence
currmeta = (*indexTopology)(atomic.LoadPointer(&b.indexers))
metaVersion = currmeta.version
schedIndexes := b.schedTokenMon.getIndexesCached()
schedIndexes := b.schedTokenMon.getSchedIndexes()

// Create new cache entry
comboIndexCache = &comboIndexCacheEntry{
Expand Down Expand Up @@ -2176,13 +2176,15 @@ func getWithAuth(url string) (*http.Response, error) {

// schedTokenMonitor monitors tokens for scheduled but not yet created indexes.
type schedTokenMonitor struct {
metaClient *metadataClient // parent of this object
scheduledIndexes []*mclient.IndexMetadata // scheduled indexes implied by unprocessed tokens
listener *mc.CommandListener
lock sync.Mutex
lCloseCh chan bool
processed map[string]bool
uCloseCh chan bool
metaClient *metadataClient // parent of this object
listener *mc.CommandListener
lCloseCh chan bool

schedIdxHolder unsafe.Pointer

// For Updater
uCloseCh chan bool
processed map[string]bool
}

// newSchedTokenMonitor launches the updater goroutine that periodically monitors scheduled index
Expand All @@ -2193,11 +2195,10 @@ func newSchedTokenMonitor(metaClient *metadataClient) *schedTokenMonitor {
listener := mc.NewCommandListener(lCloseCh, false, false, false, false, true, true)

s := &schedTokenMonitor{
metaClient: metaClient,
scheduledIndexes: make([]*mclient.IndexMetadata, 0),
listener: listener,
lCloseCh: lCloseCh,
processed: make(map[string]bool),
metaClient: metaClient,
listener: listener,
lCloseCh: lCloseCh,
processed: make(map[string]bool),
}

s.uCloseCh = make(chan bool)
Expand Down Expand Up @@ -2231,7 +2232,8 @@ func (s *schedTokenMonitor) markProcessed(key string) {

// getIndexesFromTokens computes the future indexes implied by scheduled index tokens.
func (s *schedTokenMonitor) getIndexesFromTokens(createTokens map[string]*mc.ScheduleCreateToken,
stopTokens map[string]*mc.StopScheduleCreateToken) []*mclient.IndexMetadata {
stopTokens map[string]*mc.StopScheduleCreateToken,
scheduledIndexes []*mclient.IndexMetadata) []*mclient.IndexMetadata {

indexes := make([]*mclient.IndexMetadata, 0, len(createTokens)) // return value

Expand Down Expand Up @@ -2261,18 +2263,6 @@ func (s *schedTokenMonitor) getIndexesFromTokens(createTokens map[string]*mc.Sch
if stopToken != nil {
logging.Debugf("schedTokenMonitor:getIndexesFromTokens stop schedule token exists for %v",
token.Definition.DefnId)
if s.checkProcessed(key) {
logging.Debugf("schedTokenMonitor::getIndexesFromTokens marking index as failed for %v", key)
marked := s.markIndexFailed(stopToken)
if marked {
continue
} else {
// This is unexpected as checkProcessed for this key true.
// Which means the index should have been found in the s.indexrs.
logging.Warnf("schedTokenMonitor:getIndexesFromTokens failed to mark index as failed for %v",
token.Definition.DefnId)
}
}

continue
}
Expand All @@ -2285,7 +2275,7 @@ func (s *schedTokenMonitor) getIndexesFromTokens(createTokens map[string]*mc.Sch
// If create token was already processed, then just mark the
// index as failed.
logging.Debugf("schedTokenMonitor::getIndexesFromTokens new stop schedule create token %v", key)
marked := s.markIndexFailed(token)
marked := s.markIndexFailed(token, scheduledIndexes)
if marked {
s.markProcessed(key)
continue
Expand Down Expand Up @@ -2315,10 +2305,11 @@ func (s *schedTokenMonitor) getIndexesFromTokens(createTokens map[string]*mc.Sch
return indexes
}

func (s *schedTokenMonitor) markIndexFailed(token *mc.StopScheduleCreateToken) bool {
func (s *schedTokenMonitor) markIndexFailed(token *mc.StopScheduleCreateToken,
scheduledIndexes []*mclient.IndexMetadata) bool {
// Note that this is an idempotent operation - as long as the value
// of the token doesn't change.
for _, index := range s.scheduledIndexes {
for _, index := range scheduledIndexes {
if index.Definition.DefnId == token.DefnId {
index.Error = token.Reason
index.ScheduleFailed = true
Expand Down Expand Up @@ -2363,12 +2354,14 @@ func (s *schedTokenMonitor) cleanseIndexes(indexes []*mclient.IndexMetadata,
return newIndexes
}

// getIndexesCached returns the most current list of scheduled indexes.
func (s *schedTokenMonitor) getIndexesCached() []*mclient.IndexMetadata {
s.lock.Lock()
defer s.lock.Unlock()

return s.scheduledIndexes
// getSchedIndexes returns the most current list of scheduled indexes.
func (s *schedTokenMonitor) getSchedIndexes() []*mclient.IndexMetadata {
ptr := atomic.LoadPointer(&s.schedIdxHolder)
if ptr != nil {
return *(*[]*mclient.IndexMetadata)(ptr)
} else {
return nil
}
}

// update sets schedTokenMonitor.indexes to the latest set of scheduled but not yet built indexes
Expand All @@ -2382,18 +2375,24 @@ func (s *schedTokenMonitor) update() {
delPaths := s.listener.GetDeletedScheduleCreateTokenPaths()

if len(createTokens) != 0 || len(stopTokens) != 0 || len(delPaths) != 0 {
s.lock.Lock()
defer s.lock.Unlock()

indexes := s.getIndexesFromTokens(createTokens, stopTokens)
oldSchedIndexes := s.getSchedIndexes()

scheduledIndexes := make([]*mclient.IndexMetadata, len(oldSchedIndexes))
for i, imd := range oldSchedIndexes {
scheduledIndexes[i] = imd.Clone()
}

indexesFromTokens := s.getIndexesFromTokens(createTokens, stopTokens, scheduledIndexes)
scheduledIndexes = append(scheduledIndexes, indexesFromTokens...)
scheduledIndexes = s.cleanseIndexes(scheduledIndexes, stopTokens, delPaths)

// comboIndexes, s.scheduledIndexes updates must be inside comboIndexCacheMut to stay synced
s.metaClient.comboIndexCacheMut.Lock()
defer s.metaClient.comboIndexCacheMut.Unlock()

atomic.StorePointer(&s.metaClient.comboIndexCache, nil)
s.scheduledIndexes = append(s.scheduledIndexes, indexes...)
s.scheduledIndexes = s.cleanseIndexes(s.scheduledIndexes, stopTokens, delPaths)
atomic.StorePointer(&s.schedIdxHolder, unsafe.Pointer(&scheduledIndexes))
}
}

Expand Down

0 comments on commit cca8d6d

Please sign in to comment.