diff --git a/secondary/manager/client/defn.go b/secondary/manager/client/defn.go index 2e07bd74f..10229baca 100644 --- a/secondary/manager/client/defn.go +++ b/secondary/manager/client/defn.go @@ -12,6 +12,8 @@ package client import ( "encoding/json" "fmt" + "sync/atomic" + "unsafe" "github.com/couchbase/gometa/common" c "github.com/couchbase/indexing/secondary/common" @@ -54,6 +56,7 @@ const ( OPCODE_CHECK_TOKEN_EXIST = OPCODE_GET_REPLICA_COUNT + 1 OPCODE_RESET_INDEX_ON_ROLLBACK = OPCODE_CHECK_TOKEN_EXIST + 1 OPCODE_DELETE_COLLECTION = OPCODE_RESET_INDEX_ON_ROLLBACK + 1 + OPCODE_CLIENT_STATS = OPCODE_DELETE_COLLECTION + 1 ) func Op2String(op common.OpCode) string { @@ -118,6 +121,8 @@ func Op2String(op common.OpCode) string { return "OPCODE_RESET_INDEX_ON_ROLLBACK" case OPCODE_DELETE_COLLECTION: return "OPCODE_DELETE_COLLECTION" + case OPCODE_CLIENT_STATS: + return "OPCODE_CLIENT_STATS" } return fmt.Sprintf("%v", op) } @@ -185,6 +190,49 @@ type PerIndexStats struct { // resident_percent and other stats will come here } +type IndexStats2Holder struct { + ptr unsafe.Pointer +} + +func (p *IndexStats2Holder) Get() *IndexStats2 { + val := (atomic.LoadPointer(&p.ptr)) + if val == nil { + return &IndexStats2{} + } + return (*IndexStats2)(val) +} + +func (p *IndexStats2Holder) Set(s *IndexStats2) { + atomic.StorePointer(&p.ptr, unsafe.Pointer(s)) +} + +func (p *PerIndexStats) Clone() *PerIndexStats { + // TODO: Update when adding stats to PerIndexStats + return nil +} + +func (d *DedupedIndexStats) Clone() *DedupedIndexStats { + clone := &DedupedIndexStats{} + clone.NumDocsPending = d.NumDocsPending + clone.NumDocsQueued = d.NumDocsQueued + clone.LastRollbackTime = d.LastRollbackTime + clone.ProgressStatTime = d.ProgressStatTime + clone.Indexes = make(map[string]*PerIndexStats) + for index, perIndexStats := range d.Indexes { + clone.Indexes[index] = perIndexStats.Clone() + } + return clone +} + +func (s *IndexStats2) Clone() *IndexStats2 { + clone := &IndexStats2{} + clone.Stats = make(map[string]*DedupedIndexStats) + for bucket, stats := range s.Stats { + clone.Stats[bucket] = stats.Clone() + } + return clone +} + ///////////////////////////////////////////////////////////////////////// // Create/Alter Index //////////////////////////////////////////////////////////////////////// diff --git a/secondary/manager/client/metadata_provider.go b/secondary/manager/client/metadata_provider.go index f9c57972d..7338098f4 100644 --- a/secondary/manager/client/metadata_provider.go +++ b/secondary/manager/client/metadata_provider.go @@ -136,6 +136,8 @@ type watcher struct { incomingReqs chan *protocol.RequestHandle pendingReqs map[uint64]*protocol.RequestHandle // key : request id loggedReqs map[common.Txnid]*protocol.RequestHandle + + clientStats IndexStats2Holder } // With partitioning, index instance is distributed among indexer nodes. @@ -4933,6 +4935,13 @@ RETRY2: err = w.updateServiceMap(addr) } + if err == nil { + clusterVersion := w.getClusterVersion() + if clusterVersion >= c.INDEXER_70_VERSION { + err = w.getClientStats() + } + } + if err != nil && retry != 0 { ticker := time.NewTicker(time.Duration(500) * time.Millisecond) select { @@ -5086,6 +5095,37 @@ func (w *watcher) updateIndexStatsNoLock(indexerId c.IndexerId, indexStats *Inde func (w *watcher) updateIndexStats2NoLock(indexerId c.IndexerId, indexStats2 *IndexStats2) map[c.IndexInstId]map[c.PartitionId]c.Statistics { + useCached := false + for _, dedupedIndexStats := range indexStats2.Stats { + if len(dedupedIndexStats.Indexes) == 0 { + useCached = true + break + } + } + + // Check if the cached client stats are updated from OPCODE_CLIENT_STATS + // It is possible that client is yet to receive the response to OPCODE_CLIENT_STATS + // while watcher has already got a notification for stats (a corner case) + clientStats := w.clientStats.Get() + if useCached == true { + if len(clientStats.Stats) == 0 { + return nil + } else { + for _, clientIndexStats := range clientStats.Stats { + if len(clientIndexStats.Indexes) == 0 { + return nil + } + } + } + } + + if useCached { + // Update the Indexes list from cached version + for bucket, dedupedIndexStats := range indexStats2.Stats { + dedupedIndexStats.Indexes = clientStats.Stats[bucket].Indexes + } + } + stats := (map[c.IndexInstId]map[c.PartitionId]c.Statistics)(nil) if indexStats2 != nil && len(indexStats2.Stats) != 0 { stats = w.provider.repo.resolveIndexStats2(indexerId, indexStats2.Stats) @@ -5094,6 +5134,10 @@ func (w *watcher) updateIndexStats2NoLock(indexerId c.IndexerId, indexStats2 *In } } + if useCached == false { + w.clientStats.Set(indexStats2.Clone()) + } + return stats } @@ -5253,6 +5297,25 @@ func (w *watcher) refreshServiceMap() error { return nil } +func (w *watcher) getClientStats() error { + + content, err := w.makeRequest(OPCODE_CLIENT_STATS, "Client Stats", []byte("")) + if err != nil { + logging.Errorf("watcher.getClientStats() %s", err) + return err + } + + clientStats, err := UnmarshallIndexStats2(content) + if err != nil { + logging.Errorf("watcher.getClientStats() %s", err) + return err + } + + w.clientStats.Set(clientStats) + + return nil +} + func (w *watcher) cleanupIndices(repo *metadataRepo) { w.mutex.Lock() diff --git a/secondary/manager/lifecycle.go b/secondary/manager/lifecycle.go index 791cac852..99870133d 100644 --- a/secondary/manager/lifecycle.go +++ b/secondary/manager/lifecycle.go @@ -57,6 +57,9 @@ type LifecycleMgr struct { done sync.WaitGroup isDone bool collAwareCluster uint32 // 0: false, 1: true + + lastSendClientStats *client.IndexStats2 + clientStatsMutex sync.Mutex } type requestHolder struct { @@ -151,15 +154,17 @@ func NewLifecycleMgr(notifier MetadataNotifier, clusterURL string) (*LifecycleMg cinfo.SetUserAgent("LifecycleMgr") mgr := &LifecycleMgr{repo: nil, - cinfo: cinfo, - notifier: notifier, - clusterURL: clusterURL, - incomings: make(chan *requestHolder, 100000), - expedites: make(chan *requestHolder, 100000), - outgoings: make(chan c.Packet, 100000), - killch: make(chan bool), - bootstraps: make(chan *requestHolder, 1000), - indexerReady: false} + cinfo: cinfo, + notifier: notifier, + clusterURL: clusterURL, + incomings: make(chan *requestHolder, 100000), + expedites: make(chan *requestHolder, 100000), + outgoings: make(chan c.Packet, 100000), + killch: make(chan bool), + bootstraps: make(chan *requestHolder, 1000), + indexerReady: false, + lastSendClientStats: &client.IndexStats2{}, + } if cinfo.GetClusterVersion() >= common.INDEXER_70_VERSION { atomic.StoreUint32(&mgr.collAwareCluster, 1) @@ -423,6 +428,8 @@ func (m *LifecycleMgr) dispatchRequest(request *requestHolder, factory *message. result, err = m.handleGetIndexReplicaCount(content) case client.OPCODE_CHECK_TOKEN_EXIST: result, err = m.handleCheckTokenExist(content) + case client.OPCODE_CLIENT_STATS: + result, err = m.handleClientStats(content) } logging.Debugf("LifecycleMgr.dispatchRequest () : send response for requestId %d, op %d, len(result) %d", reqId, op, len(result)) @@ -2309,7 +2316,13 @@ func (m *LifecycleMgr) broadcastStats() { } } else { idxStats2 := convertToIndexStats2(*filtered) - if err := m.repo.BroadcastIndexStats2(idxStats2); err != nil { + statsToBroadCast := m.GetDiffFromLastSent(idxStats2) + + m.clientStatsMutex.Lock() + m.lastSendClientStats = idxStats2 + m.clientStatsMutex.Unlock() + + if err := m.repo.BroadcastIndexStats2(statsToBroadCast); err != nil { logging.Errorf("lifecycleMgr: fail to send indexStats2. Error = %v", err) } } @@ -2323,6 +2336,46 @@ func (m *LifecycleMgr) broadcastStats() { } } +// TODO: This method can further be optimized to broadcast only incremental changes. +// The IndexStats2 data structure must be modified to contain deleted indexes list +// and current indexes map with storage stats +// Currently, this method broadcasts full set of stats if there is any change in buckets +// or indexes per bucket +func (m *LifecycleMgr) GetDiffFromLastSent(currStats *client.IndexStats2) *client.IndexStats2 { + m.clientStatsMutex.Lock() + defer m.clientStatsMutex.Unlock() + + if len(m.lastSendClientStats.Stats) != len(currStats.Stats) { + return currStats + } + + statsToBroadCast := &client.IndexStats2{} + statsToBroadCast.Stats = make(map[string]*client.DedupedIndexStats) + + for bucket, lastSentDeduped := range m.lastSendClientStats.Stats { + if currDeduped, ok := currStats.Stats[bucket]; !ok { + return currStats + } else { + if len(currDeduped.Indexes) != len(lastSentDeduped.Indexes) { + return currStats + } + for indexName, _ := range lastSentDeduped.Indexes { + if _, ok := currDeduped.Indexes[indexName]; !ok { + return currStats + } + } + } + statsToBroadCast.Stats[bucket] = &client.DedupedIndexStats{} + statsToBroadCast.Stats[bucket].NumDocsPending = currStats.Stats[bucket].NumDocsPending + statsToBroadCast.Stats[bucket].NumDocsQueued = currStats.Stats[bucket].NumDocsQueued + statsToBroadCast.Stats[bucket].LastRollbackTime = currStats.Stats[bucket].LastRollbackTime + statsToBroadCast.Stats[bucket].ProgressStatTime = currStats.Stats[bucket].ProgressStatTime + statsToBroadCast.Stats[bucket].Indexes = nil + } + + return statsToBroadCast +} + // This method takes in common.Statistics as input, // de-duplicates the stats and converts them to client.IndexStats2 format func convertToIndexStats2(stats common.Statistics) *client.IndexStats2 { @@ -2369,6 +2422,16 @@ func convertToIndexStats2(stats common.Statistics) *client.IndexStats2 { return indexStats2 } +//----------------------------------------------------------- +// Client Stats +//----------------------------------------------------------- +func (m *LifecycleMgr) handleClientStats(content []byte) ([]byte, error) { + m.clientStatsMutex.Lock() + defer m.clientStatsMutex.Unlock() + + return client.MarshallIndexStats2(m.lastSendClientStats) +} + //----------------------------------------------------------- // Reset Index (for upgrade) //-----------------------------------------------------------