Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-01.03.2021-01.39.pass.html
Change-Id: I6e1e304fe1b47cda704c55220bd5ac8048769dcb
  • Loading branch information
jeelanp2003 committed Mar 1, 2021
2 parents 2a5fe20 + b041d06 commit 5cc41df
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 16 deletions.
6 changes: 6 additions & 0 deletions secondary/indexer/mutation_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/couchbase/indexing/secondary/common"
Expand Down Expand Up @@ -1031,6 +1032,7 @@ func (m *mutationMgr) persistMutationQueue(q IndexerMutationQueue,
go func(config common.Config) {
defer m.flusherWaitGroup.Done()

start := time.Now().UnixNano()
flusher := NewFlusher(config, stats)
sts := Timestamp(ts.Seqnos)
msgch := flusher.PersistUptoTS(q.queue, streamId, keyspaceId,
Expand Down Expand Up @@ -1063,6 +1065,10 @@ func (m *mutationMgr) persistMutationQueue(q IndexerMutationQueue,
ts: ts,
aborted: true}
}
keyspaceStats := m.stats.GetKeyspaceStats(streamId, keyspaceId)
if keyspaceStats != nil {
keyspaceStats.flushLatDist.Add(time.Now().UnixNano() - start)
}
}(m.config)

}
Expand Down
9 changes: 7 additions & 2 deletions secondary/indexer/scan_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,9 @@ func (s *scanCoordinator) serverCallback(protoReq interface{}, ctx interface{},
}

if req.Stats != nil {
req.Stats.scanReqInitDuration.Add(time.Now().Sub(ttime).Nanoseconds())
elapsed := time.Now().Sub(ttime).Nanoseconds()
req.Stats.scanReqInitDuration.Add(elapsed)
req.Stats.scanReqInitLatDist.Add(elapsed)

now := time.Now().UnixNano()
req.Stats.numRequests.Add(1)
Expand Down Expand Up @@ -322,7 +324,9 @@ func (s *scanCoordinator) serverCallback(protoReq interface{}, ctx interface{},

defer func() {
if req.Stats != nil {
req.Stats.scanReqDuration.Add(time.Now().Sub(ttime).Nanoseconds())
elapsed := time.Now().Sub(ttime).Nanoseconds()
req.Stats.scanReqDuration.Add(elapsed)
req.Stats.scanReqLatDist.Add(elapsed)
}
}()

Expand Down Expand Up @@ -435,6 +439,7 @@ func (s *scanCoordinator) handleScanRequest(req *ScanRequest, w ScanResponseWrit
req.Stats.scanBytesRead.Add(int64(scanPipeline.BytesRead()))
req.Stats.scanDuration.Add(scanTime.Nanoseconds())
req.Stats.scanWaitDuration.Add(waitTime.Nanoseconds())
req.Stats.scanReqWaitLatDist.Add(waitTime.Nanoseconds())

if req.GroupAggr != nil {
req.Stats.numRowsReturnedAggr.Add(int64(scanPipeline.RowsReturned()))
Expand Down
43 changes: 37 additions & 6 deletions secondary/indexer/stats_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ const APPROX_METRIC_COUNT = 25

var METRICS_PREFIX = "index_"

// 0-2ms, 2ms-5ms, 5ms-10ms, 10ms-20ms, 20ms-30ms, 30ms-50ms, 50ms-100ms, 100ms-Inf
var latencyDist = []int64{0, 2, 5, 10, 20, 30, 50, 100}

// end-end scan request latency
// 0-2ms, 2ms-5ms, 5ms-10ms, 10ms-20ms, 20ms-30ms, 30ms-50ms, 50ms-100ms, 100ms-1000ms,
// 1000ms-5000ms, 5000ms-10000ms, 10000ms-50000ms, 50000ms-Inf
var scanReqLatencyDist = []int64{0, 2, 5, 10, 20, 30, 50, 100, 1000, 5000, 10000, 50000}

func init() {
uptime = time.Now()
num_cpu_core = runtime.NumCPU()
Expand All @@ -66,6 +74,7 @@ type KeyspaceStats struct {
numRollbacks stats.Int64Val
numRollbacksToZero stats.Int64Val
tsQueueSize stats.Int64Val
flushLatDist stats.Histogram
}

// KeyspaceStats.Init initializes a per-keyspace stats object.
Expand All @@ -77,6 +86,7 @@ func (s *KeyspaceStats) Init(keyspaceId string) {
s.numMutationsQueued.Init()
s.tsQueueSize.Init()
s.numNonAlignTS.Init()
s.flushLatDist.InitLatency(latencyDist, func(v int64) string { return fmt.Sprintf("%vms", v/int64(time.Millisecond)) })
}

func (s *KeyspaceStats) addKeyspaceStatsToStatsMap(statMap *StatsMap) {
Expand All @@ -86,6 +96,7 @@ func (s *KeyspaceStats) addKeyspaceStatsToStatsMap(statMap *StatsMap) {
statMap.AddStatValueFiltered("num_mutations_queued", &s.numMutationsQueued)
statMap.AddStatValueFiltered("ts_queue_size", &s.tsQueueSize)
statMap.AddStatValueFiltered("num_nonalign_ts", &s.numNonAlignTS)
statMap.AddStatValueFiltered("flush_latency_dist", &s.flushLatDist)

bucket := GetBucketFromKeyspaceId(s.keyspaceId)
if st := common.BucketSeqsTiming(bucket); st != nil {
Expand Down Expand Up @@ -256,6 +267,12 @@ type IndexStats struct {
avgArrLenHolder stats.Int64Val
keySizeDist stats.MapVal
arrKeySizeDist stats.MapVal

scanReqInitLatDist stats.Histogram
scanReqWaitLatDist stats.Histogram
scanReqLatDist stats.Histogram
snapGenLatDist stats.Histogram
snapLatDist stats.Histogram
}

type IndexerStatsHolder struct {
Expand Down Expand Up @@ -490,6 +507,13 @@ func (s *IndexStats) Init() {
s.keySizeDist.Init()
s.arrKeySizeDist.Init()

s.scanReqInitLatDist.InitLatency(latencyDist, func(v int64) string { return fmt.Sprintf("%vms", v/int64(time.Millisecond)) })
s.scanReqWaitLatDist.InitLatency(latencyDist, func(v int64) string { return fmt.Sprintf("%vms", v/int64(time.Millisecond)) })
s.scanReqLatDist.InitLatency(scanReqLatencyDist, func(v int64) string { return fmt.Sprintf("%vms", v/int64(time.Millisecond)) })

s.snapGenLatDist.InitLatency(latencyDist, func(v int64) string { return fmt.Sprintf("%vms", v/int64(time.Millisecond)) })
s.snapLatDist.InitLatency(latencyDist, func(v int64) string { return fmt.Sprintf("%vms", v/int64(time.Millisecond)) })

s.partitions = make(map[common.PartitionId]*IndexStats)

// Set filters
Expand Down Expand Up @@ -1709,8 +1733,14 @@ func (s *IndexStats) addIndexStatsToMap(statMap *StatsMap, spec *statsSpec) {
statMap.AddStatValueFiltered("avg_array_length", &s.avgArrLenHolder)
}

statMap.AddStatValueFiltered("avg_scan_request_init_latency", &s.scanReqInitLat)
statMap.AddStatValueFiltered("scan_req_init_latency_dist", &s.scanReqInitLatDist)
statMap.AddStatValueFiltered("scan_req_wait_latency_dist", &s.scanReqWaitLatDist)
statMap.AddStatValueFiltered("scan_req_latency_dist", &s.scanReqWaitLatDist)
statMap.AddStatValueFiltered("snapshot_gen_latency_dist", &s.snapGenLatDist)
statMap.AddStatValueFiltered("snapshot_latency_dist", &s.snapLatDist)

if !spec.essential {
statMap.AddStatValueFiltered("avg_scan_request_init_latency", &s.scanReqInitLat)
statMap.AddStatValueFiltered("avg_scan_request_alloc_latency", &s.scanReqAllocLat)
}

Expand Down Expand Up @@ -1756,6 +1786,12 @@ func (s *IndexStats) addIndexStatsToMap(statMap *StatsMap, spec *statsSpec) {
},
&s.cacheHitPercent, s.partnAvgInt64Stats)

statMap.AddAggrTimingStatFiltered("timings/dcp_getseqs",
func(ss *IndexStats) *stats.TimingStat {
return &ss.Timings.dcpSeqs
},
&s.Timings.dcpSeqs, s.partnTimingStats)

// TODO:
// Right now, there aren't any consumer specific stats that are essential.
// But there needs a better way to handle this. May be a separate consumer
Expand All @@ -1767,11 +1803,6 @@ func (s *IndexStats) addIndexStatsToMap(statMap *StatsMap, spec *statsSpec) {
// If timing stat is partitioned, the final value
// is aggreated across the partitions (sum, count, sumOfSq).
// ------------------------------------------------------------
statMap.AddAggrTimingStatFiltered("timings/dcp_getseqs",
func(ss *IndexStats) *stats.TimingStat {
return &ss.Timings.dcpSeqs
},
&s.Timings.dcpSeqs, s.partnTimingStats)

statMap.AddAggrTimingStatFiltered("timings/storage_clone_handle",
func(ss *IndexStats) *stats.TimingStat {
Expand Down
7 changes: 5 additions & 2 deletions secondary/indexer/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (s *storageMgr) createSnapshotWorker(streamId common.StreamId, keyspaceId s
indexInstMap common.IndexInstMap, indexPartnMap IndexPartnMap, instIdList []common.IndexInstId,
stats *IndexerStats, flushWasAborted bool, hasAllSB bool) {

startTime := time.Now().UnixNano()
var needsCommit bool
var forceCommit bool
snapType := tsVbuuid.GetSnapType()
Expand Down Expand Up @@ -475,7 +476,7 @@ func (s *storageMgr) createSnapshotWorker(streamId common.StreamId, keyspaceId s
} else {
DestroyIndexSnapshot(is)
}
s.updateSnapIntervalStat(idxStats)
s.updateSnapIntervalStat(idxStats, startTime)

}(idxInstId)
}
Expand Down Expand Up @@ -580,7 +581,7 @@ func (s *storageMgr) flushDone(streamId common.StreamId, keyspaceId string,
aborted: flushWasAborted}
}

func (s *storageMgr) updateSnapIntervalStat(idxStats *IndexStats) {
func (s *storageMgr) updateSnapIntervalStat(idxStats *IndexStats, startTime int64) {

// Compute avgTsInterval
last := idxStats.lastTsTime.Value()
Expand All @@ -592,6 +593,8 @@ func (s *storageMgr) updateSnapIntervalStat(idxStats *IndexStats) {
idxStats.avgTsInterval.Set(avg)
idxStats.sinceLastSnapshot.Set(curr - last)
}
idxStats.snapLatDist.Add(curr - last)
idxStats.snapGenLatDist.Add(curr - startTime)
idxStats.lastTsTime.Set(curr)

idxStats.updateAllPartitionStats(
Expand Down
2 changes: 1 addition & 1 deletion secondary/manager/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4618,7 +4618,7 @@ func newBuilder(mgr *LifecycleMgr) *builder {
builder := &builder{
manager: mgr,
pendings: make(map[string][]uint64),
notifych: make(chan *common.IndexDefn, 10000),
notifych: make(chan *common.IndexDefn, 50000),
batchSize: int32(common.SystemConfig["indexer.settings.build.batch_size"].Int()),
commandListener: mc.NewCommandListener(donech, false, true, false, false, false, false),
listenerDonech: donech,
Expand Down
61 changes: 56 additions & 5 deletions secondary/stats/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,50 @@ import (
"fmt"
"math"
"sync/atomic"
"time"
)

type Histogram struct {
buckets []int64
vals []int64
humanizeFn func(int64) string
bitmap uint64
}

func (h *Histogram) Init(buckets []int64, humanizeFn func(int64) string) {
l := len(buckets)
h.buckets = make([]int64, l+2)
copy(h.buckets[1:l], buckets)
copy(h.buckets[1:l+1], buckets)
h.buckets[0] = math.MinInt64
h.buckets[l] = math.MaxInt64
h.vals = make([]int64, l)
h.buckets[l+1] = math.MaxInt64
h.vals = make([]int64, l+1)

if humanizeFn == nil {
humanizeFn = func(v int64) string { return fmt.Sprint(v) }
}

h.humanizeFn = humanizeFn

h.bitmap = AllStatsFilter
}

func (h *Histogram) InitLatency(buckets []int64, humanizeFn func(int64) string) {
l := len(buckets)
h.buckets = make([]int64, l+2)
for i := 1; i <= l; i++ {
h.buckets[i] = buckets[i-1] * int64(time.Millisecond)
}
h.buckets[0] = math.MinInt64
h.buckets[l+1] = math.MaxInt64
h.vals = make([]int64, l+1)

if humanizeFn == nil {
humanizeFn = func(v int64) string { return fmt.Sprint(v) }
}

h.humanizeFn = humanizeFn

h.bitmap = AllStatsFilter
}

func (h *Histogram) Add(val int64) {
Expand All @@ -46,7 +69,7 @@ func (h *Histogram) findBucket(val int64) int {
return 0
}

func (h Histogram) String() string {
func (h *Histogram) String() string {
s := "\""
l := len(h.vals)
for i := 0; i < l; i++ {
Expand All @@ -63,6 +86,34 @@ func (h Histogram) String() string {
return s
}

func (h Histogram) MarshalJSON() ([]byte, error) {
func (h *Histogram) MarshalJSON() ([]byte, error) {
return []byte(h.String()), nil
}

func (h *Histogram) AddFilter(bitmap uint64) {
h.bitmap |= bitmap
}

func (h *Histogram) Map(bitmap uint64) bool {
return (h.bitmap & bitmap) != 0
}

func (h *Histogram) GetValue() interface{} {
out := make(map[string]interface{})
for i := 0; i < len(h.buckets)-1; i++ {

low := h.humanizeFn(h.buckets[i])
hi := h.humanizeFn(h.buckets[i+1])

var key string
if h.buckets[i] == math.MinInt64 {
key = fmt.Sprintf("(-Inf-%v)", hi)
} else if h.buckets[i+1] == math.MaxInt64 {
key = fmt.Sprintf("(%v-Inf)", low)
} else {
key = fmt.Sprintf("(%v-%v)", low, hi)
}
out[key] = h.vals[i]
}
return out
}

0 comments on commit 5cc41df

Please sign in to comment.