Skip to content

Commit

Permalink
MB-41547 Process stats in storage manager asyncronously
Browse files Browse the repository at this point in the history
And serialise stats processing with rollback to avoid
any inconsistency in stats as rollback can reset some
of the stats

Change-Id: I75af0f07540469588ffd2d5355ad3832cf6ca257
  • Loading branch information
varunv-cb committed Apr 6, 2021
1 parent bb2047f commit 731ac71
Showing 1 changed file with 104 additions and 85 deletions.
189 changes: 104 additions & 85 deletions secondary/indexer/storage_manager.go
Expand Up @@ -73,6 +73,8 @@ type storageMgr struct {

muSnap sync.Mutex //lock to protect updates to snapMap and waitersMap

statsLock sync.Mutex

lastFlushDone int64
}

Expand Down Expand Up @@ -746,6 +748,13 @@ func (sm *storageMgr) handleRollback(cmd Message) {

sm.supvCmdch <- &MsgSuccess{}

// During rollback, some of the snapshot stats get reset
// or updated by slice. Therefore, serialise rollback and
// retrieving stats from slice to avoid any inconsistency
// in stats
sm.statsLock.Lock()
defer sm.statsLock.Unlock()

streamId := cmd.(*MsgRollback).GetStreamId()
rollbackTs := cmd.(*MsgRollback).GetRollbackTs()
keyspaceId := cmd.(*MsgRollback).GetKeyspaceId()
Expand Down Expand Up @@ -1342,105 +1351,115 @@ func (s *storageMgr) listenSnapshotReqs(index int) {

func (s *storageMgr) handleGetIndexStorageStats(cmd Message) {
s.supvCmdch <- &MsgSuccess{}
req := cmd.(*MsgIndexStorageStats)
replych := req.GetReplyChannel()
spec := req.GetStatsSpec()
stats := s.getIndexStorageStats(spec)
replych <- stats
go func() { // Process storage stats asyncronously
s.statsLock.Lock()
defer s.statsLock.Unlock()

req := cmd.(*MsgIndexStorageStats)
replych := req.GetReplyChannel()
spec := req.GetStatsSpec()
stats := s.getIndexStorageStats(spec)
replych <- stats
}()
}

func (s *storageMgr) handleStats(cmd Message) {
s.supvCmdch <- &MsgSuccess{}

req := cmd.(*MsgStatsRequest)
replych := req.GetReplyChannel()
storageStats := s.getIndexStorageStats(nil)
go func() {
s.statsLock.Lock()
defer s.statsLock.Unlock()

//node level stats
var numStorageInstances int64
var totalDataSize, totalDiskSize, totalRecsInMem, totalRecsOnDisk int64
var avgMutationRate, avgDrainRate, avgDiskBps int64
req := cmd.(*MsgStatsRequest)
replych := req.GetReplyChannel()
storageStats := s.getIndexStorageStats(nil)

stats := s.stats.Get()
indexInstMap := s.indexInstMap.Get()
for _, st := range storageStats {
inst := indexInstMap[st.InstId]
if inst.State == common.INDEX_STATE_DELETED {
continue
}
//node level stats
var numStorageInstances int64
var totalDataSize, totalDiskSize, totalRecsInMem, totalRecsOnDisk int64
var avgMutationRate, avgDrainRate, avgDiskBps int64

numStorageInstances++

idxStats := stats.GetPartitionStats(st.InstId, st.PartnId)
// TODO(sarath): Investigate the reason for inconsistent stats map
// This nil check is a workaround to avoid indexer crashes for now.
if idxStats != nil {
idxStats.dataSize.Set(st.Stats.DataSize)
idxStats.dataSizeOnDisk.Set(st.Stats.DataSizeOnDisk)
idxStats.logSpaceOnDisk.Set(st.Stats.LogSpace)
idxStats.diskSize.Set(st.Stats.DiskSize)
idxStats.memUsed.Set(st.Stats.MemUsed)
if common.GetStorageMode() != common.MOI {
if common.GetStorageMode() == common.PLASMA {
idxStats.fragPercent.Set(int64(st.getPlasmaFragmentation()))
} else {
idxStats.fragPercent.Set(int64(st.GetFragmentation()))
}
stats := s.stats.Get()
indexInstMap := s.indexInstMap.Get()
for _, st := range storageStats {
inst := indexInstMap[st.InstId]
if inst.State == common.INDEX_STATE_DELETED {
continue
}

idxStats.getBytes.Set(st.Stats.GetBytes)
idxStats.insertBytes.Set(st.Stats.InsertBytes)
idxStats.deleteBytes.Set(st.Stats.DeleteBytes)

// compute mutation rate
now := time.Now().UnixNano()
elapsed := float64(now-idxStats.lastMutateGatherTime.Value()) / float64(time.Second)
if elapsed > 60 {
numDocsIndexed := idxStats.numDocsIndexed.Value()
mutationRate := float64(numDocsIndexed-idxStats.lastNumDocsIndexed.Value()) / elapsed
idxStats.avgMutationRate.Set(int64((mutationRate + float64(idxStats.avgMutationRate.Value())) / 2))
idxStats.lastNumDocsIndexed.Set(numDocsIndexed)

numItemsFlushed := idxStats.numItemsFlushed.Value()
drainRate := float64(numItemsFlushed-idxStats.lastNumItemsFlushed.Value()) / elapsed
idxStats.avgDrainRate.Set(int64((drainRate + float64(idxStats.avgDrainRate.Value())) / 2))
idxStats.lastNumItemsFlushed.Set(numItemsFlushed)

diskBytes := idxStats.getBytes.Value() + idxStats.insertBytes.Value() + idxStats.deleteBytes.Value()
diskBps := float64(diskBytes-idxStats.lastDiskBytes.Value()) / elapsed
idxStats.avgDiskBps.Set(int64((diskBps + float64(idxStats.avgDiskBps.Value())) / 2))
idxStats.lastDiskBytes.Set(diskBytes)

logging.Debugf("StorageManager.handleStats: partition %v DiskBps %v avgDiskBps %v drain rate %v",
st.PartnId, diskBps, idxStats.avgDiskBps.Value(), idxStats.avgDrainRate.Value())

idxStats.lastMutateGatherTime.Set(now)
}
numStorageInstances++

//compute node level stats
totalDataSize += st.Stats.DataSize
totalDiskSize += st.Stats.DiskSize
totalRecsInMem += idxStats.numRecsInMem.Value()
totalRecsOnDisk += idxStats.numRecsOnDisk.Value()
avgMutationRate += idxStats.avgMutationRate.Value()
avgDrainRate += idxStats.avgDrainRate.Value()
avgDiskBps += idxStats.avgDiskBps.Value()
idxStats := stats.GetPartitionStats(st.InstId, st.PartnId)
// TODO(sarath): Investigate the reason for inconsistent stats map
// This nil check is a workaround to avoid indexer crashes for now.
if idxStats != nil {
idxStats.dataSize.Set(st.Stats.DataSize)
idxStats.dataSizeOnDisk.Set(st.Stats.DataSizeOnDisk)
idxStats.logSpaceOnDisk.Set(st.Stats.LogSpace)
idxStats.diskSize.Set(st.Stats.DiskSize)
idxStats.memUsed.Set(st.Stats.MemUsed)
if common.GetStorageMode() != common.MOI {
if common.GetStorageMode() == common.PLASMA {
idxStats.fragPercent.Set(int64(st.getPlasmaFragmentation()))
} else {
idxStats.fragPercent.Set(int64(st.GetFragmentation()))
}
}

idxStats.getBytes.Set(st.Stats.GetBytes)
idxStats.insertBytes.Set(st.Stats.InsertBytes)
idxStats.deleteBytes.Set(st.Stats.DeleteBytes)

// compute mutation rate
now := time.Now().UnixNano()
elapsed := float64(now-idxStats.lastMutateGatherTime.Value()) / float64(time.Second)
if elapsed > 60 {
numDocsIndexed := idxStats.numDocsIndexed.Value()
mutationRate := float64(numDocsIndexed-idxStats.lastNumDocsIndexed.Value()) / elapsed
idxStats.avgMutationRate.Set(int64((mutationRate + float64(idxStats.avgMutationRate.Value())) / 2))
idxStats.lastNumDocsIndexed.Set(numDocsIndexed)

numItemsFlushed := idxStats.numItemsFlushed.Value()
drainRate := float64(numItemsFlushed-idxStats.lastNumItemsFlushed.Value()) / elapsed
idxStats.avgDrainRate.Set(int64((drainRate + float64(idxStats.avgDrainRate.Value())) / 2))
idxStats.lastNumItemsFlushed.Set(numItemsFlushed)

diskBytes := idxStats.getBytes.Value() + idxStats.insertBytes.Value() + idxStats.deleteBytes.Value()
diskBps := float64(diskBytes-idxStats.lastDiskBytes.Value()) / elapsed
idxStats.avgDiskBps.Set(int64((diskBps + float64(idxStats.avgDiskBps.Value())) / 2))
idxStats.lastDiskBytes.Set(diskBytes)

logging.Debugf("StorageManager.handleStats: partition %v DiskBps %v avgDiskBps %v drain rate %v",
st.PartnId, diskBps, idxStats.avgDiskBps.Value(), idxStats.avgDrainRate.Value())

idxStats.lastMutateGatherTime.Set(now)
}

//compute node level stats
totalDataSize += st.Stats.DataSize
totalDiskSize += st.Stats.DiskSize
totalRecsInMem += idxStats.numRecsInMem.Value()
totalRecsOnDisk += idxStats.numRecsOnDisk.Value()
avgMutationRate += idxStats.avgMutationRate.Value()
avgDrainRate += idxStats.avgDrainRate.Value()
avgDiskBps += idxStats.avgDiskBps.Value()
}
}
}

stats.totalDataSize.Set(totalDataSize)
stats.totalDiskSize.Set(totalDiskSize)
stats.numStorageInstances.Set(numStorageInstances)
stats.avgMutationRate.Set(avgMutationRate)
stats.avgDrainRate.Set(avgDrainRate)
stats.avgDiskBps.Set(avgDiskBps)
if numStorageInstances > 0 {
stats.avgResidentPercent.Set(common.ComputePercent(totalRecsInMem, totalRecsOnDisk))
} else {
stats.avgResidentPercent.Set(0)
}
stats.totalDataSize.Set(totalDataSize)
stats.totalDiskSize.Set(totalDiskSize)
stats.numStorageInstances.Set(numStorageInstances)
stats.avgMutationRate.Set(avgMutationRate)
stats.avgDrainRate.Set(avgDrainRate)
stats.avgDiskBps.Set(avgDiskBps)
if numStorageInstances > 0 {
stats.avgResidentPercent.Set(common.ComputePercent(totalRecsInMem, totalRecsOnDisk))
} else {
stats.avgResidentPercent.Set(0)
}

replych <- true
replych <- true
}()
}

func (s *storageMgr) getIndexStorageStats(spec *statsSpec) []IndexStorageStats {
Expand Down

0 comments on commit 731ac71

Please sign in to comment.