Skip to content

Commit

Permalink
MB-39658 Do not broadcast list of indexes if there is no change
Browse files Browse the repository at this point in the history
in list of indexes or buckets

The protocol is as follows:
a. When a watcher starts, it sends a message to indexer and get's
the client stats and caches them
b. From then on, when ever watcher gets stats, it checks if the map
of index names is empty or not
c. If the map is empty, then it will retrieve the name of indexes
from the cached client stats
d. Otherwise, watcher will update the cached client stats with
the index names present in the broadcasted message

Change-Id: I57d946e0fbd06edd8d161b855d4ae18d64e9b9b9
  • Loading branch information
varunv-cb committed Jul 18, 2020
1 parent fd3deca commit d69bcb6
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 10 deletions.
48 changes: 48 additions & 0 deletions secondary/manager/client/defn.go
Expand Up @@ -12,6 +12,8 @@ package client
import (
"encoding/json"
"fmt"
"sync/atomic"
"unsafe"

"github.com/couchbase/gometa/common"
c "github.com/couchbase/indexing/secondary/common"
Expand Down Expand Up @@ -54,6 +56,7 @@ const (
OPCODE_CHECK_TOKEN_EXIST = OPCODE_GET_REPLICA_COUNT + 1
OPCODE_RESET_INDEX_ON_ROLLBACK = OPCODE_CHECK_TOKEN_EXIST + 1
OPCODE_DELETE_COLLECTION = OPCODE_RESET_INDEX_ON_ROLLBACK + 1
OPCODE_CLIENT_STATS = OPCODE_DELETE_COLLECTION + 1
)

func Op2String(op common.OpCode) string {
Expand Down Expand Up @@ -118,6 +121,8 @@ func Op2String(op common.OpCode) string {
return "OPCODE_RESET_INDEX_ON_ROLLBACK"
case OPCODE_DELETE_COLLECTION:
return "OPCODE_DELETE_COLLECTION"
case OPCODE_CLIENT_STATS:
return "OPCODE_CLIENT_STATS"
}
return fmt.Sprintf("%v", op)
}
Expand Down Expand Up @@ -185,6 +190,49 @@ type PerIndexStats struct {
// resident_percent and other stats will come here
}

type IndexStats2Holder struct {
ptr unsafe.Pointer
}

func (p *IndexStats2Holder) Get() *IndexStats2 {
val := (atomic.LoadPointer(&p.ptr))
if val == nil {
return &IndexStats2{}
}
return (*IndexStats2)(val)
}

func (p *IndexStats2Holder) Set(s *IndexStats2) {
atomic.StorePointer(&p.ptr, unsafe.Pointer(s))
}

func (p *PerIndexStats) Clone() *PerIndexStats {
// TODO: Update when adding stats to PerIndexStats
return nil
}

func (d *DedupedIndexStats) Clone() *DedupedIndexStats {
clone := &DedupedIndexStats{}
clone.NumDocsPending = d.NumDocsPending
clone.NumDocsQueued = d.NumDocsQueued
clone.LastRollbackTime = d.LastRollbackTime
clone.ProgressStatTime = d.ProgressStatTime
clone.Indexes = make(map[string]*PerIndexStats)
for index, perIndexStats := range d.Indexes {
clone.Indexes[index] = perIndexStats.Clone()
}
return clone
}

func (s *IndexStats2) Clone() *IndexStats2 {
clone := &IndexStats2{}
clone.Stats = make(map[string]*DedupedIndexStats)
for bucket, stats := range s.Stats {
clone.Stats[bucket] = stats.Clone()
}
return clone
}

/////////////////////////////////////////////////////////////////////////
// Create/Alter Index
////////////////////////////////////////////////////////////////////////
Expand Down
63 changes: 63 additions & 0 deletions secondary/manager/client/metadata_provider.go
Expand Up @@ -136,6 +136,8 @@ type watcher struct {
incomingReqs chan *protocol.RequestHandle
pendingReqs map[uint64]*protocol.RequestHandle // key : request id
loggedReqs map[common.Txnid]*protocol.RequestHandle

clientStats IndexStats2Holder
}

// With partitioning, index instance is distributed among indexer nodes.
Expand Down Expand Up @@ -4933,6 +4935,13 @@ RETRY2:
err = w.updateServiceMap(addr)
}

if err == nil {
clusterVersion := w.getClusterVersion()
if clusterVersion >= c.INDEXER_70_VERSION {
err = w.getClientStats()
}
}

if err != nil && retry != 0 {
ticker := time.NewTicker(time.Duration(500) * time.Millisecond)
select {
Expand Down Expand Up @@ -5086,6 +5095,37 @@ func (w *watcher) updateIndexStatsNoLock(indexerId c.IndexerId, indexStats *Inde

func (w *watcher) updateIndexStats2NoLock(indexerId c.IndexerId, indexStats2 *IndexStats2) map[c.IndexInstId]map[c.PartitionId]c.Statistics {

useCached := false
for _, dedupedIndexStats := range indexStats2.Stats {
if len(dedupedIndexStats.Indexes) == 0 {
useCached = true
break
}
}

// Check if the cached client stats are updated from OPCODE_CLIENT_STATS
// It is possible that client is yet to receive the response to OPCODE_CLIENT_STATS
// while watcher has already got a notification for stats (a corner case)
clientStats := w.clientStats.Get()
if useCached == true {
if len(clientStats.Stats) == 0 {
return nil
} else {
for _, clientIndexStats := range clientStats.Stats {
if len(clientIndexStats.Indexes) == 0 {
return nil
}
}
}
}

if useCached {
// Update the Indexes list from cached version
for bucket, dedupedIndexStats := range indexStats2.Stats {
dedupedIndexStats.Indexes = clientStats.Stats[bucket].Indexes
}
}

stats := (map[c.IndexInstId]map[c.PartitionId]c.Statistics)(nil)
if indexStats2 != nil && len(indexStats2.Stats) != 0 {
stats = w.provider.repo.resolveIndexStats2(indexerId, indexStats2.Stats)
Expand All @@ -5094,6 +5134,10 @@ func (w *watcher) updateIndexStats2NoLock(indexerId c.IndexerId, indexStats2 *In
}
}

if useCached == false {
w.clientStats.Set(indexStats2.Clone())
}

return stats
}

Expand Down Expand Up @@ -5253,6 +5297,25 @@ func (w *watcher) refreshServiceMap() error {
return nil
}

func (w *watcher) getClientStats() error {

content, err := w.makeRequest(OPCODE_CLIENT_STATS, "Client Stats", []byte(""))
if err != nil {
logging.Errorf("watcher.getClientStats() %s", err)
return err
}

clientStats, err := UnmarshallIndexStats2(content)
if err != nil {
logging.Errorf("watcher.getClientStats() %s", err)
return err
}

w.clientStats.Set(clientStats)

return nil
}

func (w *watcher) cleanupIndices(repo *metadataRepo) {

w.mutex.Lock()
Expand Down
83 changes: 73 additions & 10 deletions secondary/manager/lifecycle.go
Expand Up @@ -57,6 +57,9 @@ type LifecycleMgr struct {
done sync.WaitGroup
isDone bool
collAwareCluster uint32 // 0: false, 1: true

lastSendClientStats *client.IndexStats2
clientStatsMutex sync.Mutex
}

type requestHolder struct {
Expand Down Expand Up @@ -151,15 +154,17 @@ func NewLifecycleMgr(notifier MetadataNotifier, clusterURL string) (*LifecycleMg
cinfo.SetUserAgent("LifecycleMgr")

mgr := &LifecycleMgr{repo: nil,
cinfo: cinfo,
notifier: notifier,
clusterURL: clusterURL,
incomings: make(chan *requestHolder, 100000),
expedites: make(chan *requestHolder, 100000),
outgoings: make(chan c.Packet, 100000),
killch: make(chan bool),
bootstraps: make(chan *requestHolder, 1000),
indexerReady: false}
cinfo: cinfo,
notifier: notifier,
clusterURL: clusterURL,
incomings: make(chan *requestHolder, 100000),
expedites: make(chan *requestHolder, 100000),
outgoings: make(chan c.Packet, 100000),
killch: make(chan bool),
bootstraps: make(chan *requestHolder, 1000),
indexerReady: false,
lastSendClientStats: &client.IndexStats2{},
}

if cinfo.GetClusterVersion() >= common.INDEXER_70_VERSION {
atomic.StoreUint32(&mgr.collAwareCluster, 1)
Expand Down Expand Up @@ -423,6 +428,8 @@ func (m *LifecycleMgr) dispatchRequest(request *requestHolder, factory *message.
result, err = m.handleGetIndexReplicaCount(content)
case client.OPCODE_CHECK_TOKEN_EXIST:
result, err = m.handleCheckTokenExist(content)
case client.OPCODE_CLIENT_STATS:
result, err = m.handleClientStats(content)
}

logging.Debugf("LifecycleMgr.dispatchRequest () : send response for requestId %d, op %d, len(result) %d", reqId, op, len(result))
Expand Down Expand Up @@ -2309,7 +2316,13 @@ func (m *LifecycleMgr) broadcastStats() {
}
} else {
idxStats2 := convertToIndexStats2(*filtered)
if err := m.repo.BroadcastIndexStats2(idxStats2); err != nil {
statsToBroadCast := m.GetDiffFromLastSent(idxStats2)

m.clientStatsMutex.Lock()
m.lastSendClientStats = idxStats2
m.clientStatsMutex.Unlock()

if err := m.repo.BroadcastIndexStats2(statsToBroadCast); err != nil {
logging.Errorf("lifecycleMgr: fail to send indexStats2. Error = %v", err)
}
}
Expand All @@ -2323,6 +2336,46 @@ func (m *LifecycleMgr) broadcastStats() {
}
}

// TODO: This method can further be optimized to broadcast only incremental changes.
// The IndexStats2 data structure must be modified to contain deleted indexes list
// and current indexes map with storage stats
// Currently, this method broadcasts full set of stats if there is any change in buckets
// or indexes per bucket
func (m *LifecycleMgr) GetDiffFromLastSent(currStats *client.IndexStats2) *client.IndexStats2 {
m.clientStatsMutex.Lock()
defer m.clientStatsMutex.Unlock()

if len(m.lastSendClientStats.Stats) != len(currStats.Stats) {
return currStats
}

statsToBroadCast := &client.IndexStats2{}
statsToBroadCast.Stats = make(map[string]*client.DedupedIndexStats)

for bucket, lastSentDeduped := range m.lastSendClientStats.Stats {
if currDeduped, ok := currStats.Stats[bucket]; !ok {
return currStats
} else {
if len(currDeduped.Indexes) != len(lastSentDeduped.Indexes) {
return currStats
}
for indexName, _ := range lastSentDeduped.Indexes {
if _, ok := currDeduped.Indexes[indexName]; !ok {
return currStats
}
}
}
statsToBroadCast.Stats[bucket] = &client.DedupedIndexStats{}
statsToBroadCast.Stats[bucket].NumDocsPending = currStats.Stats[bucket].NumDocsPending
statsToBroadCast.Stats[bucket].NumDocsQueued = currStats.Stats[bucket].NumDocsQueued
statsToBroadCast.Stats[bucket].LastRollbackTime = currStats.Stats[bucket].LastRollbackTime
statsToBroadCast.Stats[bucket].ProgressStatTime = currStats.Stats[bucket].ProgressStatTime
statsToBroadCast.Stats[bucket].Indexes = nil
}

return statsToBroadCast
}

// This method takes in common.Statistics as input,
// de-duplicates the stats and converts them to client.IndexStats2 format
func convertToIndexStats2(stats common.Statistics) *client.IndexStats2 {
Expand Down Expand Up @@ -2369,6 +2422,16 @@ func convertToIndexStats2(stats common.Statistics) *client.IndexStats2 {
return indexStats2
}

//-----------------------------------------------------------
// Client Stats
//-----------------------------------------------------------
func (m *LifecycleMgr) handleClientStats(content []byte) ([]byte, error) {
m.clientStatsMutex.Lock()
defer m.clientStatsMutex.Unlock()

return client.MarshallIndexStats2(m.lastSendClientStats)
}

//-----------------------------------------------------------
// Reset Index (for upgrade)
//-----------------------------------------------------------
Expand Down

0 comments on commit d69bcb6

Please sign in to comment.