Skip to content

Commit

Permalink
MB-54799 add primitives per bucket diagnostics to log, vitals, stats,…
Browse files Browse the repository at this point in the history
… prometheus

Change-Id: I4f72c3f4b22a42c9fcad8d8bd22b69ae6ae1d689
Reviewed-on: https://review.couchbase.org/c/query/+/185202
Reviewed-by: Donald Haggart <donald.haggart@couchbase.com>
Tested-by: Marco Greco <marco.greco@couchbase.com>
  • Loading branch information
Marco Greco committed Jan 18, 2023
1 parent 337f30f commit 801f9ba
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 29 deletions.
12 changes: 12 additions & 0 deletions datastore/couchbase/couchbase.go
Expand Up @@ -373,6 +373,14 @@ func (s *store) NamespaceByName(name string) (p datastore.Namespace, e errors.Er
return p, nil
}

func (s *store) ForeachBucket(f func(datastore.ExtendedBucket)) {
for _, n := range s.namespaceCache {
for _, k := range n.keyspaceCache {
f(k.cbKeyspace)
}
}
}

// The ns_server admin API is open iff we can access the /pools API without a password.
func (s *store) adminIsOpen() bool {
url := s.connectionUrl + "/pools"
Expand Down Expand Up @@ -1610,6 +1618,10 @@ func (p *namespace) KeyspaceUpdateCallback(bucket *cb.Bucket) {
}
}

func (b *keyspace) GetIOStats(reset bool, all bool) map[string]interface{} {
return b.cbbucket.GetIOStats(reset, all)
}

func (b *keyspace) NamespaceId() string {
return b.namespace.Id()
}
Expand Down
11 changes: 9 additions & 2 deletions datastore/datastore.go
Expand Up @@ -7,7 +7,6 @@
// the file licenses/APL2.txt.

/*
Package datastore provides a common datastore abstraction over storage
engines, such as Couchbase server, cloud, mobile, file, 3rd-party
databases and storage engines, etc.
Expand All @@ -17,7 +16,6 @@ The logical hierarchy for the query language is datastore -> namespace -> bucket
TODO: This hierarchy should be revisited and aligned with long-term
plans before query Beta / GA.
*/
package datastore

Expand Down Expand Up @@ -86,6 +84,11 @@ type Systemstore interface {
PrivilegesFromPath(fullname string, keyspace string, privilege auth.Privilege, privs *auth.Privileges)
}

type Datastore2 interface {
Datastore
ForeachBucket(func(ExtendedBucket))
}

type AuditInfo struct {
AuditEnabled bool
EventDisabled map[uint32]bool
Expand Down Expand Up @@ -162,6 +165,10 @@ type Bucket interface {
CreateScope(name string) errors.Error // Create a new scope
DropScope(name string) errors.Error // Drop a scope
}
type ExtendedBucket interface {
Bucket
GetIOStats(bool, bool) map[string]interface{} // get an object containing IO stats for the bucket
}

type Scope interface {
Id() string
Expand Down
23 changes: 18 additions & 5 deletions primitives/couchbase/memcached.go
Expand Up @@ -454,10 +454,12 @@ func (b *Bucket) do3(vb uint16, f func(mc *memcached.Client, vb uint16) error, d
if lastError == nil {
return nil
}

if !desc.retry {
desc.attempts++
break
}
atomic.AddUint64(&b.retryCount, 1)

// KV is not willing to service any more requests for this interval
if desc.delay > time.Duration(0) {
Expand Down Expand Up @@ -685,17 +687,19 @@ func backOff(attempt, maxAttempts int, duration time.Duration, exponential bool)

func (b *Bucket) doBulkGet(vb uint16, keys []string, active func() bool, reqDeadline time.Time,
ech chan<- error, subPaths []string, useReplica bool, eStatus *errorStatus,
context ...*memcached.ClientContext) (map[string]*gomemcached.MCResponse, time.Duration) {
context ...*memcached.ClientContext) (map[string]*gomemcached.MCResponse, time.Duration, int) {

rv := _STRING_MCRESPONSE_POOL.Get()
done := false
bname := b.Name
retries := 0
desc := &doDescriptor{useReplicas: useReplica, version: b.Version, maxTries: b.backOffRetries()}
var lastError error
for ; desc.attempts < maxBulkRetries && !done && !eStatus.errStatus; desc.attempts++ {
if !active() {
return rv, desc.delay
return rv, desc.delay, retries
}
retries++

// This stack frame exists to ensure we can clean up
// connection at a reasonable time.
Expand Down Expand Up @@ -762,7 +766,7 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string, active func() bool, reqDead
}()

if err != nil {
return rv, time.Duration(0)
return rv, time.Duration(0), retries
}
}

Expand All @@ -772,7 +776,7 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string, active func() bool, reqDead
ech <- err
}

return rv, desc.delay
return rv, desc.delay, retries
}

type errorStatus struct {
Expand Down Expand Up @@ -844,6 +848,7 @@ func vbBulkGetWorker(ch chan *vbBulkGet) {
func vbDoBulkGet(vbg *vbBulkGet) time.Duration {
var delay time.Duration
var rv map[string]*gomemcached.MCResponse
var retries int

defer func() {

Expand All @@ -854,7 +859,10 @@ func vbDoBulkGet(vbg *vbBulkGet) time.Duration {
// Workers cannot panic and die
recover()
}()
rv, delay = vbg.b.doBulkGet(vbg.k, vbg.keys, vbg.active, vbg.reqDeadline, vbg.ech, vbg.subPaths, vbg.useReplica, vbg.groupError, vbg.context...)
rv, delay, retries = vbg.b.doBulkGet(vbg.k, vbg.keys, vbg.active, vbg.reqDeadline, vbg.ech, vbg.subPaths, vbg.useReplica, vbg.groupError, vbg.context...)
if retries > 0 {
atomic.AddUint64(&vbg.b.retryCount, uint64(retries))
}
if delay != time.Duration(0) {

// find received documents and remove the keys from the key slice
Expand All @@ -879,6 +887,7 @@ func vbDoBulkGet(vbg *vbBulkGet) time.Duration {
}
if len(rv) > 0 {
vbg.ch <- rv
atomic.AddUint64(&vbg.b.readCount, uint64(len(rv)))
}
return delay
}
Expand Down Expand Up @@ -1161,6 +1170,7 @@ func (b *Bucket) WriteCasWithMT(k string, flags, exp int, cas uint64, v interfac
return err
})

atomic.AddUint64(&b.writeCount, 1)
if res != nil {
_, wu = res.ComputeUnits()
}
Expand Down Expand Up @@ -1266,6 +1276,7 @@ func (b *Bucket) GetsMC(key string, active func() bool, reqDeadline time.Time, u
}
return nil
}, false, useReplica, b.backOffRetries())
atomic.AddUint64(&b.readCount, 1)
return response, err
}

Expand Down Expand Up @@ -1365,6 +1376,7 @@ func (b *Bucket) Incr(k string, amt, def uint64, exp int, context ...*memcached.
rv = res
return nil
})
atomic.AddUint64(&b.writeCount, 1)
return rv, err
}

Expand All @@ -1379,6 +1391,7 @@ func (b *Bucket) Decr(k string, amt, def uint64, exp int, context ...*memcached.
rv = res
return nil
})
atomic.AddUint64(&b.writeCount, 1)
return rv, err
}

Expand Down
57 changes: 50 additions & 7 deletions primitives/couchbase/ns_server.go
Expand Up @@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -249,6 +250,9 @@ type VBucketServerMap struct {
// take a boolean parameter "bucketLocked".
type Bucket struct {
sync.RWMutex
readCount uint64
writeCount uint64
retryCount uint64
AuthType string `json:"authType"`
Capabilities []string `json:"bucketCapabilities"`
CapabilitiesVersion string `json:"bucketCapabilitiesVer"`
Expand Down Expand Up @@ -977,13 +981,14 @@ func Connect(baseU string, userAgent string) (Client, error) {
// The map key is the name of the scope.
// Example data:
// {"uid":"b","scopes":[
// {"name":"_default","uid":"0","collections":[
// {"name":"_default","uid":"0"}]},
// {"name":"myScope1","uid":"8","collections":[
// {"name":"myCollectionB","uid":"c"},
// {"name":"myCollectionA","uid":"b"}]},
// {"name":"myScope2","uid":"9","collections":[
// {"name":"myCollectionC","uid":"d"}]}]}
//
// {"name":"_default","uid":"0","collections":[
// {"name":"_default","uid":"0"}]},
// {"name":"myScope1","uid":"8","collections":[
// {"name":"myCollectionB","uid":"c"},
// {"name":"myCollectionA","uid":"b"}]},
// {"name":"myScope2","uid":"9","collections":[
// {"name":"myCollectionC","uid":"d"}]}]}
type InputManifest struct {
Uid string
Scopes []InputScope
Expand Down Expand Up @@ -1314,6 +1319,44 @@ func (b *Bucket) StopUpdater() {
}
}

func (b *Bucket) GetIOStats(reset bool, all bool) map[string]interface{} {
var readCount uint64
var writeCount uint64
var retryCount uint64
var rv map[string]interface{}

if reset {
readCount = atomic.SwapUint64(&b.readCount, uint64(0))
writeCount = atomic.SwapUint64(&b.writeCount, uint64(0))
retryCount = atomic.SwapUint64(&b.retryCount, uint64(0))
logging.Infof("read %v write %v retry %v", readCount, writeCount, retryCount)
} else {
readCount = atomic.LoadUint64(&b.readCount)
writeCount = atomic.LoadUint64(&b.writeCount)
retryCount = atomic.LoadUint64(&b.retryCount)
}
if readCount != 0 || all {
if rv == nil {
rv = make(map[string]interface{})
}
rv["reads"] = readCount
}
if writeCount != 0 || all {
if rv == nil {
rv = make(map[string]interface{})
}
rv["writes"] = writeCount
}
if retryCount != 0 || all {
if rv == nil {
rv = make(map[string]interface{})
}
rv["retries"] = retryCount
}
logging.Infof("rv %v", rv)
return rv
}

func bucketFinalizer(b *Bucket) {
if b.connPools != nil {
if !b.closed {
Expand Down
36 changes: 25 additions & 11 deletions server/http/admin_accounting_endpoint.go
Expand Up @@ -311,12 +311,26 @@ func doPrometheusLow(endpoint *HttpEndpoint, w http.ResponseWriter, req *http.Re
w.Write([]byte(fmt.Sprintf("%v\n", localValue(endpoint.server, name))))
}

bName := "bucket"
if tenant.IsServerless() {
tenant.Foreach(func(n string, m memory.MemoryManager) {
w.Write([]byte("# TYPE n1ql_tenant_memory gauge\n"))
w.Write([]byte("n1ql_tenant_memory{bucket=\"" + n + "\"} "))
w.Write([]byte(fmt.Sprintf("%v\n", m.AllocatedMemory())))
})
bName = "tenant"
}
store, ok := datastore.GetDatastore().(datastore.Datastore2)
if ok {
store.ForeachBucket(func(b datastore.ExtendedBucket) {
stats := b.GetIOStats(false, true)
for n, s := range stats {
statName := "n1ql_" + bName + "_" + n
w.Write([]byte("# TYPE n1ql_" + statName + " gauge\n"))
w.Write([]byte(statName + "{bucket=\"" + b.Name() + "\"} "))
w.Write([]byte(fmt.Sprintf("%v\n", s)))
}
})
}

return textPlain(""), nil
Expand Down Expand Up @@ -1274,17 +1288,17 @@ func (r remapper) remap(bucket string, path []string) {
}

// Restore semantics:
// - for global functions, no include, exclude remap is possible.
// any non global functions passed will simply be skipped
// - for scope functions, include, exclude and remap will only operate at scope level
// currently no check is made that the target scope exist, which may very well leave stale function definitions
// - for both cases the only thing that counts is the name, and not the signature, it is therefore possible to go back in
// time and restore function definitions with different parameter lists
// - remapping to an existing scope will replace existing functions, with the same or different signature, which may not be
// intended, however a different conflict resolution would prevent going back in time
// - be aware that remapping may have other side effects: for query context based statements contained within functions, the new targets
// will be under the new bucket / scope query context, while accesses with full path will remain unchanged.
// this makes perfect sense, but may not necessarely be what the user intended
// - for global functions, no include, exclude remap is possible.
// any non global functions passed will simply be skipped
// - for scope functions, include, exclude and remap will only operate at scope level
// currently no check is made that the target scope exist, which may very well leave stale function definitions
// - for both cases the only thing that counts is the name, and not the signature, it is therefore possible to go back in
// time and restore function definitions with different parameter lists
// - remapping to an existing scope will replace existing functions, with the same or different signature, which may not be
// intended, however a different conflict resolution would prevent going back in time
// - be aware that remapping may have other side effects: for query context based statements contained within functions, the new targets
// will be under the new bucket / scope query context, while accesses with full path will remain unchanged.
// this makes perfect sense, but may not necessarely be what the user intended
func doFunctionRestore(v []byte, l int, b string, include, exclude matcher, remap remapper) errors.Error {
var oState json.KeyState

Expand Down
24 changes: 20 additions & 4 deletions server/serverstats.go
Expand Up @@ -17,6 +17,7 @@ import (
"time"

json "github.com/couchbase/go_json"
"github.com/couchbase/query/datastore"
"github.com/couchbase/query/logging"
"github.com/couchbase/query/memory"
"github.com/couchbase/query/tenant"
Expand Down Expand Up @@ -48,9 +49,7 @@ type statsCollector struct {
// Cpu/Memory Collector
//////////////////////////////////////////////////////////////

//
// Start Stats collection
//
func (this *Server) StartStatsCollector() (err error) {

collector := &statsCollector{server: this}
Expand All @@ -69,9 +68,7 @@ func (this *Server) StartStatsCollector() (err error) {
return nil
}

//
// Gather Cpu/Memory
//
func (c *statsCollector) runCollectStats() {
var lastGC uint64
ticker := time.NewTicker(_STATS_INTRVL)
Expand Down Expand Up @@ -124,6 +121,25 @@ func (c *statsCollector) runCollectStats() {
})
newStats["tenant.memory.usage"] = tenants
}

// get per bucket stats
var bstats map[string]interface{}
store, ok := datastore.GetDatastore().(datastore.Datastore2)
if ok {
store.ForeachBucket(func(b datastore.ExtendedBucket) {
stats := b.GetIOStats(false, false)
if len(stats) != 0 {
if bstats == nil {
bstats = make(map[string]interface{})
}
bstats[b.Name()] = stats
}
})
}
if bstats != nil {
newStats["bucket.IO.stats"] = bstats
}

oldStats = c.server.AccountingStore().ExternalVitals(newStats)
newStats = oldStats

Expand Down

0 comments on commit 801f9ba

Please sign in to comment.