Skip to content

Commit

Permalink
CBG-117 Improved stats (#3852)
Browse files Browse the repository at this point in the history
* CBG-117 Improved cache stats

Implements new Iridium cache stats.

* Avoid race when iterating over database set

Cleans up a few other cases where we weren't getting the lock, or were unnecessarily getting a write lock.
  • Loading branch information
adamcfraser authored and tleyden committed Dec 6, 2018
1 parent 24114ec commit 0c5d11c
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 36 deletions.
18 changes: 10 additions & 8 deletions base/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ var (

// Per-replication (sg-replicate) stats
PerReplicationStats *expvar.Map

)

const (
Expand All @@ -48,12 +47,16 @@ const (
const (

// StatsCache
StatKeyRevisionCacheHits = "rev_cache_hits"
StatKeyRevisionCacheMisses = "rev_cache_misses"
StatKeyChanCachePerf = "chan_cache_perf"
StatKeyRevCacheUtilization = "rev_cache_utilization"
StatKeyChanCacheUtilization = "chan_cache_utilization"
StatKeyNumSkippedSeqs = "num_skipped_seqs"
StatKeyRevisionCacheHits = "rev_cache_hits"
StatKeyRevisionCacheMisses = "rev_cache_misses"
StatKeyChannelCacheHits = "chan_cache_hits"
StatKeyChannelCacheMisses = "chan_cache_misses"
StatKeyChannelCacheRevsActive = "chan_cache_active_revs"
StatKeyChannelCacheRevsTombstone = "chan_cache_tombstone_revs"
StatKeyChannelCacheRevsRemoval = "chan_cache_removal_revs"
StatKeyChannelCacheNumChannels = "chan_cache_num_channels"
StatKeyChannelCacheMaxEntries = "chan_cache_max_entries"
StatKeyNumSkippedSeqs = "num_skipped_seqs"

// StatsDatabase
StatKeyNumReplicationConnsActive = "num_replication_conns_active"
Expand Down Expand Up @@ -172,7 +175,6 @@ func init() {

}


// Removes the per-replication stats for this replication id by
// regenerating a new expvar map without that particular replicationUuid
func RemovePerReplicationStats(replicationUuid string) {
Expand Down
18 changes: 11 additions & 7 deletions base/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,27 +1055,31 @@ func ReplaceAll(s, chars, new string) string {
return s
}

// Convert an int into an expvar.Var
func ExpvarIntVal(val int) expvar.Var {
// Convert an int into an *expvar.Int
func ExpvarIntVal(val int) *expvar.Int {
value := expvar.Int{}
value.Set(int64(val))
return &value
}

func ExpvarInt64Val(val int64) expvar.Var {
func ExpvarInt64Val(val int64) *expvar.Int {
value := expvar.Int{}
value.Set(val)
return &value
}

func ExpvarUInt64Val(val uint64) expvar.Var {
func ExpvarUInt64Val(val uint64) *expvar.Int {
value := expvar.Int{}
value.Set(int64(val)) // lossy, but expvar doesn't provide an alternative
if val > math.MaxInt64 {
value.Set(math.MaxInt64) // lossy, but expvar doesn't provide an alternative
} else {
value.Set(int64(val))
}
return &value
}

// Convert a float into an expvar.Var
func ExpvarFloatVal(val float64) expvar.Var {
// Convert a float into an *expvar.Float
func ExpvarFloatVal(val float64) *expvar.Float {
value := expvar.Float{}
value.Set(float64(val))
return &value
Expand Down
22 changes: 18 additions & 4 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ func (c *changeCache) CleanAgedItems() {
c.lock.Lock()
defer c.lock.Unlock()

for channelName := range c.channelCaches {
c._getChannelCache(channelName).pruneCacheAge()
for _, channelCache := range c.channelCaches {
channelCache.pruneCacheAge()
}

return
Expand Down Expand Up @@ -532,8 +532,8 @@ func (c *changeCache) Remove(docIDs []string, startTime time.Time) (count int) {
c.lock.Lock()
defer c.lock.Unlock()

for channelName := range c.channelCaches {
count += c._getChannelCache(channelName).Remove(docIDs, startTime)
for _, channelCache := range c.channelCaches {
count += channelCache.Remove(docIDs, startTime)
}

return count
Expand Down Expand Up @@ -764,6 +764,7 @@ func (c *changeCache) _getChannelCache(channelName string) *channelCache {

cache = newChannelCacheWithOptions(c.context, channelName, validFrom, c.options)
c.channelCaches[channelName] = cache
c.context.DbStats.StatsCache().Add(base.StatKeyChannelCacheNumChannels, 1)
}
return cache
}
Expand Down Expand Up @@ -810,6 +811,19 @@ func (c *changeCache) getOldestSkippedSequence() uint64 {
}
}

func (c *changeCache) MaxCacheSize() int {
c.lock.RLock()
defer c.lock.RUnlock()
maxCacheSize := 0
for _, channelCache := range c.channelCaches {
channelSize := channelCache.GetSize()
if channelSize > maxCacheSize {
maxCacheSize = channelSize
}
}
return maxCacheSize
}

// Set the initial sequence. Presumes that change chache is already locked.
func (c *changeCache) _setInitialSequence(initialSequence uint64) {
c.initialSequence = initialSequence
Expand Down
7 changes: 7 additions & 0 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ func e(seq uint64, docid string, revid string) *LogEntry {
}
}

// Tombstoned entry
func et(seq uint64, docid string, revid string) *LogEntry {
entry := e(seq, docid, revid)
entry.SetDeleted()
return entry
}

func testBucketContext() *DatabaseContext {

context, _ := NewDatabaseContext("db", testBucket().Bucket, false, DatabaseContextOptions{})
Expand Down
36 changes: 34 additions & 2 deletions db/channel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ func (c *channelCache) Remove(docIDs []string, startTime time.Time) (count int)
continue
}

// Decrement utilization stats for removed entry
c.UpdateCacheUtilization(c.logs[i], -1)

// Memory-leak safe delete from SliceTricks:
copy(c.logs[i:], c.logs[i+1:])
c.logs[len(c.logs)-1] = nil
Expand All @@ -225,6 +228,7 @@ func (c *channelCache) _pruneCacheLength() (pruned int) {
if len(c.logs) > c.options.ChannelCacheMaxLength {
pruned = len(c.logs) - c.options.ChannelCacheMaxLength
for i := 0; i < pruned; i++ {
c.UpdateCacheUtilization(c.logs[i], -1)
delete(c.cachedDocIDs, c.logs[i].DocID)
}
c.validFrom = c.logs[pruned-1].Sequence + 1
Expand All @@ -250,6 +254,7 @@ func (c *channelCache) pruneCacheAge() {
// those that fit within channelCacheMinLength and therefore not subject to cache age restrictions
for len(c.logs) > c.options.ChannelCacheMinLength && time.Since(c.logs[0].TimeReceived) > c.options.ChannelCacheAge {
c.validFrom = c.logs[0].Sequence + 1
c.UpdateCacheUtilization(c.logs[0], -1)
delete(c.cachedDocIDs, c.logs[0].DocID)
c.logs = c.logs[1:]
pruned++
Expand Down Expand Up @@ -317,6 +322,7 @@ func (c *channelCache) GetChanges(options ChangesOptions) ([]*LogEntry, error) {
}
startSeq := options.Since.SafeSequence() + 1
if cacheValidFrom <= startSeq {
c.context.DbStats.StatsCache().Add(base.StatKeyChannelCacheHits, 1)
return resultFromCache, nil
}

Expand All @@ -333,11 +339,13 @@ func (c *channelCache) GetChanges(options ChangesOptions) ([]*LogEntry, error) {
base.UD(c.channelName), options.Since.String(), len(resultFromCache)-numFromCache, cacheValidFrom)
}
if cacheValidFrom <= startSeq {
c.context.DbStats.StatsCache().Add(base.StatKeyChannelCacheHits, 1)
return resultFromCache, nil
}

// Now query the view. We set the max sequence equal to cacheValidFrom, so we'll get one
// overlap, which helps confirm that we've got everything.
c.context.DbStats.StatsCache().Add(base.StatKeyChannelCacheMisses, 1)
resultFromView, err := c.context.getChangesInChannelFromQuery(c.channelName, cacheValidFrom,
options)
if err != nil {
Expand All @@ -363,6 +371,7 @@ func (c *channelCache) GetChanges(options ChangesOptions) ([]*LogEntry, error) {
result = append(result, resultFromCache[0:n]...)
}
base.Infof(base.KeyCache, "GetChangesInChannel(%q) --> %d rows", base.UD(c.channelName), len(result))

return result, nil
}

Expand Down Expand Up @@ -392,7 +401,9 @@ func (c *channelCache) _appendChange(change *LogEntry) {
if _, found := c.cachedDocIDs[change.DocID]; found {
for i := end; i >= 0; i-- {
if log[i].DocID == change.DocID {
c.UpdateCacheUtilization(log[i], -1)
copy(log[i:], log[i+1:])
c.UpdateCacheUtilization(change, 1)
log[end] = change
return
}
Expand All @@ -403,16 +414,30 @@ func (c *channelCache) _appendChange(change *LogEntry) {
c._adjustFirstSeq(change)
}
c.logs = append(log, change)

c.UpdateCacheUtilization(change, 1)
c.cachedDocIDs[change.DocID] = struct{}{}
}

// Updates cache utilization. Note that cache entries that are both removals and tombstones are counted as removals
func (c *channelCache) UpdateCacheUtilization(entry *LogEntry, delta int64) {
if entry.IsRemoved() {
c.context.DbStats.StatsCache().Add(base.StatKeyChannelCacheRevsRemoval, delta)
} else if entry.IsDeleted() {
c.context.DbStats.StatsCache().Add(base.StatKeyChannelCacheRevsTombstone, delta)
} else {
c.context.DbStats.StatsCache().Add(base.StatKeyChannelCacheRevsActive, delta)
}
}

// Insert out-of-sequence entry into the cache. If the docId is already present in a later
// sequence, we skip the insert. If the docId is already present in an earlier sequence,
// we remove the earlier sequence.
func (c *channelCache) insertChange(log *LogEntries, change *LogEntry) {

defer func() {
c.cachedDocIDs[change.DocID] = struct{}{}
c.UpdateCacheUtilization(change, 1)
}()

end := len(*log) - 1
Expand All @@ -434,7 +459,8 @@ func (c *channelCache) insertChange(log *LogEntries, change *LogEntry) {
// we've already cached a later revision of this document, can ignore update
return
} else {
// found existing prior to insert position
// found existing prior to insert position. Decrement utilization for replaced version
c.UpdateCacheUtilization((*log)[i], -1)
if i == insertAtIndex-1 {
// The sequence is adjacent to another with the same docId - replace it
// instead of inserting
Expand All @@ -455,7 +481,6 @@ func (c *channelCache) insertChange(log *LogEntries, change *LogEntry) {
*log = append(*log, nil)
copy((*log)[insertAtIndex+1:], (*log)[insertAtIndex:])
(*log)[insertAtIndex] = change

return
}

Expand Down Expand Up @@ -532,6 +557,7 @@ func (c *channelCache) prependChanges(changes LogEntries, changesValidFrom uint6
copy(entriesToPrepend[1:], entriesToPrepend)
entriesToPrepend[0] = change
c.cachedDocIDs[change.DocID] = struct{}{}
c.UpdateCacheUtilization(change, 1)

if len(entriesToPrepend) >= cacheCapacity {
// If we reach capacity before prepending the entire set of changes, set changesValidFrom to the oldest sequence
Expand All @@ -558,6 +584,12 @@ func (c *channelCache) prependChanges(changes LogEntries, changesValidFrom uint6
}
}

func (c *channelCache) GetSize() int {
c.lock.RLock()
defer c.lock.RUnlock()
return len(c.logs)
}

type lateLogEntry struct {
logEntry *LogEntry
arrived time.Time // Time arrived in late log - for diagnostics tracking
Expand Down
Loading

0 comments on commit 0c5d11c

Please sign in to comment.