From 731ac71890619f6b437c5b1e906cc0939a915489 Mon Sep 17 00:00:00 2001 From: Varun Velamuri Date: Fri, 19 Mar 2021 11:31:19 +0530 Subject: [PATCH] MB-41547 Process stats in storage manager asyncronously And serialise stats processing with rollback to avoid any inconsistency in stats as rollback can reset some of the stats Change-Id: I75af0f07540469588ffd2d5355ad3832cf6ca257 --- secondary/indexer/storage_manager.go | 189 +++++++++++++++------------ 1 file changed, 104 insertions(+), 85 deletions(-) diff --git a/secondary/indexer/storage_manager.go b/secondary/indexer/storage_manager.go index 4dbe659aa..a5e452740 100644 --- a/secondary/indexer/storage_manager.go +++ b/secondary/indexer/storage_manager.go @@ -73,6 +73,8 @@ type storageMgr struct { muSnap sync.Mutex //lock to protect updates to snapMap and waitersMap + statsLock sync.Mutex + lastFlushDone int64 } @@ -746,6 +748,13 @@ func (sm *storageMgr) handleRollback(cmd Message) { sm.supvCmdch <- &MsgSuccess{} + // During rollback, some of the snapshot stats get reset + // or updated by slice. Therefore, serialise rollback and + // retrieving stats from slice to avoid any inconsistency + // in stats + sm.statsLock.Lock() + defer sm.statsLock.Unlock() + streamId := cmd.(*MsgRollback).GetStreamId() rollbackTs := cmd.(*MsgRollback).GetRollbackTs() keyspaceId := cmd.(*MsgRollback).GetKeyspaceId() @@ -1342,105 +1351,115 @@ func (s *storageMgr) listenSnapshotReqs(index int) { func (s *storageMgr) handleGetIndexStorageStats(cmd Message) { s.supvCmdch <- &MsgSuccess{} - req := cmd.(*MsgIndexStorageStats) - replych := req.GetReplyChannel() - spec := req.GetStatsSpec() - stats := s.getIndexStorageStats(spec) - replych <- stats + go func() { // Process storage stats asyncronously + s.statsLock.Lock() + defer s.statsLock.Unlock() + + req := cmd.(*MsgIndexStorageStats) + replych := req.GetReplyChannel() + spec := req.GetStatsSpec() + stats := s.getIndexStorageStats(spec) + replych <- stats + }() } func (s *storageMgr) handleStats(cmd Message) { s.supvCmdch <- &MsgSuccess{} - req := cmd.(*MsgStatsRequest) - replych := req.GetReplyChannel() - storageStats := s.getIndexStorageStats(nil) + go func() { + s.statsLock.Lock() + defer s.statsLock.Unlock() - //node level stats - var numStorageInstances int64 - var totalDataSize, totalDiskSize, totalRecsInMem, totalRecsOnDisk int64 - var avgMutationRate, avgDrainRate, avgDiskBps int64 + req := cmd.(*MsgStatsRequest) + replych := req.GetReplyChannel() + storageStats := s.getIndexStorageStats(nil) - stats := s.stats.Get() - indexInstMap := s.indexInstMap.Get() - for _, st := range storageStats { - inst := indexInstMap[st.InstId] - if inst.State == common.INDEX_STATE_DELETED { - continue - } + //node level stats + var numStorageInstances int64 + var totalDataSize, totalDiskSize, totalRecsInMem, totalRecsOnDisk int64 + var avgMutationRate, avgDrainRate, avgDiskBps int64 - numStorageInstances++ - - idxStats := stats.GetPartitionStats(st.InstId, st.PartnId) - // TODO(sarath): Investigate the reason for inconsistent stats map - // This nil check is a workaround to avoid indexer crashes for now. - if idxStats != nil { - idxStats.dataSize.Set(st.Stats.DataSize) - idxStats.dataSizeOnDisk.Set(st.Stats.DataSizeOnDisk) - idxStats.logSpaceOnDisk.Set(st.Stats.LogSpace) - idxStats.diskSize.Set(st.Stats.DiskSize) - idxStats.memUsed.Set(st.Stats.MemUsed) - if common.GetStorageMode() != common.MOI { - if common.GetStorageMode() == common.PLASMA { - idxStats.fragPercent.Set(int64(st.getPlasmaFragmentation())) - } else { - idxStats.fragPercent.Set(int64(st.GetFragmentation())) - } + stats := s.stats.Get() + indexInstMap := s.indexInstMap.Get() + for _, st := range storageStats { + inst := indexInstMap[st.InstId] + if inst.State == common.INDEX_STATE_DELETED { + continue } - idxStats.getBytes.Set(st.Stats.GetBytes) - idxStats.insertBytes.Set(st.Stats.InsertBytes) - idxStats.deleteBytes.Set(st.Stats.DeleteBytes) - - // compute mutation rate - now := time.Now().UnixNano() - elapsed := float64(now-idxStats.lastMutateGatherTime.Value()) / float64(time.Second) - if elapsed > 60 { - numDocsIndexed := idxStats.numDocsIndexed.Value() - mutationRate := float64(numDocsIndexed-idxStats.lastNumDocsIndexed.Value()) / elapsed - idxStats.avgMutationRate.Set(int64((mutationRate + float64(idxStats.avgMutationRate.Value())) / 2)) - idxStats.lastNumDocsIndexed.Set(numDocsIndexed) - - numItemsFlushed := idxStats.numItemsFlushed.Value() - drainRate := float64(numItemsFlushed-idxStats.lastNumItemsFlushed.Value()) / elapsed - idxStats.avgDrainRate.Set(int64((drainRate + float64(idxStats.avgDrainRate.Value())) / 2)) - idxStats.lastNumItemsFlushed.Set(numItemsFlushed) - - diskBytes := idxStats.getBytes.Value() + idxStats.insertBytes.Value() + idxStats.deleteBytes.Value() - diskBps := float64(diskBytes-idxStats.lastDiskBytes.Value()) / elapsed - idxStats.avgDiskBps.Set(int64((diskBps + float64(idxStats.avgDiskBps.Value())) / 2)) - idxStats.lastDiskBytes.Set(diskBytes) - - logging.Debugf("StorageManager.handleStats: partition %v DiskBps %v avgDiskBps %v drain rate %v", - st.PartnId, diskBps, idxStats.avgDiskBps.Value(), idxStats.avgDrainRate.Value()) - - idxStats.lastMutateGatherTime.Set(now) - } + numStorageInstances++ - //compute node level stats - totalDataSize += st.Stats.DataSize - totalDiskSize += st.Stats.DiskSize - totalRecsInMem += idxStats.numRecsInMem.Value() - totalRecsOnDisk += idxStats.numRecsOnDisk.Value() - avgMutationRate += idxStats.avgMutationRate.Value() - avgDrainRate += idxStats.avgDrainRate.Value() - avgDiskBps += idxStats.avgDiskBps.Value() + idxStats := stats.GetPartitionStats(st.InstId, st.PartnId) + // TODO(sarath): Investigate the reason for inconsistent stats map + // This nil check is a workaround to avoid indexer crashes for now. + if idxStats != nil { + idxStats.dataSize.Set(st.Stats.DataSize) + idxStats.dataSizeOnDisk.Set(st.Stats.DataSizeOnDisk) + idxStats.logSpaceOnDisk.Set(st.Stats.LogSpace) + idxStats.diskSize.Set(st.Stats.DiskSize) + idxStats.memUsed.Set(st.Stats.MemUsed) + if common.GetStorageMode() != common.MOI { + if common.GetStorageMode() == common.PLASMA { + idxStats.fragPercent.Set(int64(st.getPlasmaFragmentation())) + } else { + idxStats.fragPercent.Set(int64(st.GetFragmentation())) + } + } + + idxStats.getBytes.Set(st.Stats.GetBytes) + idxStats.insertBytes.Set(st.Stats.InsertBytes) + idxStats.deleteBytes.Set(st.Stats.DeleteBytes) + + // compute mutation rate + now := time.Now().UnixNano() + elapsed := float64(now-idxStats.lastMutateGatherTime.Value()) / float64(time.Second) + if elapsed > 60 { + numDocsIndexed := idxStats.numDocsIndexed.Value() + mutationRate := float64(numDocsIndexed-idxStats.lastNumDocsIndexed.Value()) / elapsed + idxStats.avgMutationRate.Set(int64((mutationRate + float64(idxStats.avgMutationRate.Value())) / 2)) + idxStats.lastNumDocsIndexed.Set(numDocsIndexed) + + numItemsFlushed := idxStats.numItemsFlushed.Value() + drainRate := float64(numItemsFlushed-idxStats.lastNumItemsFlushed.Value()) / elapsed + idxStats.avgDrainRate.Set(int64((drainRate + float64(idxStats.avgDrainRate.Value())) / 2)) + idxStats.lastNumItemsFlushed.Set(numItemsFlushed) + + diskBytes := idxStats.getBytes.Value() + idxStats.insertBytes.Value() + idxStats.deleteBytes.Value() + diskBps := float64(diskBytes-idxStats.lastDiskBytes.Value()) / elapsed + idxStats.avgDiskBps.Set(int64((diskBps + float64(idxStats.avgDiskBps.Value())) / 2)) + idxStats.lastDiskBytes.Set(diskBytes) + + logging.Debugf("StorageManager.handleStats: partition %v DiskBps %v avgDiskBps %v drain rate %v", + st.PartnId, diskBps, idxStats.avgDiskBps.Value(), idxStats.avgDrainRate.Value()) + + idxStats.lastMutateGatherTime.Set(now) + } + + //compute node level stats + totalDataSize += st.Stats.DataSize + totalDiskSize += st.Stats.DiskSize + totalRecsInMem += idxStats.numRecsInMem.Value() + totalRecsOnDisk += idxStats.numRecsOnDisk.Value() + avgMutationRate += idxStats.avgMutationRate.Value() + avgDrainRate += idxStats.avgDrainRate.Value() + avgDiskBps += idxStats.avgDiskBps.Value() + } } - } - stats.totalDataSize.Set(totalDataSize) - stats.totalDiskSize.Set(totalDiskSize) - stats.numStorageInstances.Set(numStorageInstances) - stats.avgMutationRate.Set(avgMutationRate) - stats.avgDrainRate.Set(avgDrainRate) - stats.avgDiskBps.Set(avgDiskBps) - if numStorageInstances > 0 { - stats.avgResidentPercent.Set(common.ComputePercent(totalRecsInMem, totalRecsOnDisk)) - } else { - stats.avgResidentPercent.Set(0) - } + stats.totalDataSize.Set(totalDataSize) + stats.totalDiskSize.Set(totalDiskSize) + stats.numStorageInstances.Set(numStorageInstances) + stats.avgMutationRate.Set(avgMutationRate) + stats.avgDrainRate.Set(avgDrainRate) + stats.avgDiskBps.Set(avgDiskBps) + if numStorageInstances > 0 { + stats.avgResidentPercent.Set(common.ComputePercent(totalRecsInMem, totalRecsOnDisk)) + } else { + stats.avgResidentPercent.Set(0) + } - replych <- true + replych <- true + }() } func (s *storageMgr) getIndexStorageStats(spec *statsSpec) []IndexStorageStats {