diff --git a/CHANGES.md b/CHANGES.md index fc47300bf..515a21810 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ Release Notes. - [UI] Add YAML editor for inputting query criteria. - Refactor TopN to support `NULL` group while keeping seriesID from the source measure. - Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only provides SST. +- Add a meter system to control the internal metrics. ### Chores diff --git a/api/common/id.go b/api/common/id.go index b625028db..41ae75519 100644 --- a/api/common/id.go +++ b/api/common/id.go @@ -21,8 +21,6 @@ import ( "context" "fmt" - "github.com/prometheus/client_golang/prometheus" - "github.com/apache/skywalking-banyandb/pkg/convert" ) @@ -55,19 +53,16 @@ type Position struct { Shard string Segment string Block string - KV string } -// Labels converts Position to Prom Labels. -func (p Position) Labels() prometheus.Labels { - return prometheus.Labels{ - "module": p.Module, - "database": p.Database, - "shard": p.Shard, - "seg": p.Segment, - "block": p.Block, - "kv": p.KV, - } +// LabelNames returns the label names of Position. +func LabelNames() []string { + return []string{"module", "database", "shard", "seg", "block"} +} + +// LabelValues returns the label values of Position. +func (p Position) LabelValues() []string { + return []string{p.Module, p.Database, p.Shard, p.Segment, p.Block} } // SetPosition sets a position returned from fn to attach it to ctx, then return a new context. diff --git a/banyand/Dockerfile b/banyand/Dockerfile index 3b5af6ba3..032d63732 100644 --- a/banyand/Dockerfile +++ b/banyand/Dockerfile @@ -39,7 +39,7 @@ FROM base AS builder RUN --mount=target=. \ --mount=type=cache,target=/root/.cache/go-build \ - BUILD_DIR=/out make -C banyand all + BUILD_DIR=/out BUILD_TAGS=prometheus make -C banyand all FROM alpine:edge AS certs RUN apk add --no-cache ca-certificates && update-ca-certificates diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go index 2f3b2ad47..795a9dc6b 100644 --- a/banyand/internal/cmd/standalone.go +++ b/banyand/internal/cmd/standalone.go @@ -35,6 +35,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/pkg/config" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/signal" "github.com/apache/skywalking-banyandb/pkg/version" @@ -77,8 +78,7 @@ func newStandaloneCmd() *cobra.Command { metricSvc := observability.NewMetricService() httpServer := http.NewService() - // Meta the run Group units. - g.Register( + units := []run.Unit{ new(signal.Handler), repo, pipeline, @@ -87,10 +87,15 @@ func newStandaloneCmd() *cobra.Command { streamSvc, q, tcp, - metricSvc, - profSvc, httpServer, - ) + profSvc, + } + _, noMetricProvider := observability.NewMeterProvider(observability.RootScope).(meter.NoopProvider) + if !noMetricProvider { + units = append(units, metricSvc) + } + // Meta the run Group units. + g.Register(units...) logging := logger.Logging{} standaloneCmd := &cobra.Command{ Use: "standalone", diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go index 7bc7ee993..8a6bf80b1 100644 --- a/banyand/kv/badger.go +++ b/banyand/kv/badger.go @@ -29,7 +29,6 @@ import ( "github.com/dgraph-io/badger/v3/skl" "github.com/dgraph-io/badger/v3/y" - "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -54,10 +53,6 @@ func (b *badgerTSS) Handover(skl *skl.Skiplist) error { return b.db.HandoverIterator(skl.NewUniIterator(false)) } -func (b *badgerTSS) Stats() (s observability.Statistics) { - return badgerStats(b.db) -} - func (b *badgerTSS) Close() error { if b.db != nil && !b.db.IsClosed() { return b.db.Close() @@ -65,14 +60,6 @@ func (b *badgerTSS) Close() error { return nil } -func badgerStats(db *badger.DB) (s observability.Statistics) { - stat := db.Stats() - return observability.Statistics{ - MemBytes: stat.MemBytes, - MaxMemBytes: db.Opts().MemTableSize, - } -} - type mergedIter struct { delegated Iterator data []byte @@ -128,10 +115,6 @@ type badgerDB struct { dbOpts badger.Options } -func (b *badgerDB) Stats() observability.Statistics { - return badgerStats(b.db) -} - func (b *badgerDB) Scan(prefix, seekKey []byte, opt ScanOpts, f ScanFunc) error { opts := badger.DefaultIteratorOptions opts.PrefetchSize = opt.PrefetchSize diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go index 1a9a7cd62..c2e8a97a7 100644 --- a/banyand/kv/kv.go +++ b/banyand/kv/kv.go @@ -28,7 +28,6 @@ import ( "github.com/dgraph-io/badger/v3/skl" "github.com/pkg/errors" - "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -71,7 +70,6 @@ type Reader interface { // Store is a common kv storage with auto-generated key. type Store interface { - observability.Observable io.Closer writer Reader @@ -85,7 +83,6 @@ type TimeSeriesReader interface { // TimeSeriesStore is time series storage. type TimeSeriesStore interface { - observability.Observable io.Closer Handover(skl *skl.Skiplist) error TimeSeriesReader @@ -163,7 +160,6 @@ type Iterable interface { // IndexStore allows writing and reading index format data. type IndexStore interface { - observability.Observable Iterable Reader Close() error diff --git a/banyand/observability/meter_noop.go b/banyand/observability/meter_noop.go new file mode 100644 index 000000000..bc25b6e63 --- /dev/null +++ b/banyand/observability/meter_noop.go @@ -0,0 +1,30 @@ +//go:build !prometheus +// +build !prometheus + +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package observability + +import ( + "github.com/apache/skywalking-banyandb/pkg/meter" +) + +// NewMeterProvider returns a meter.Provider based on the given scope. +func NewMeterProvider(_ meter.Scope) meter.Provider { + return meter.NoopProvider{} +} diff --git a/banyand/observability/meter_prom.go b/banyand/observability/meter_prom.go new file mode 100644 index 000000000..1047e99ff --- /dev/null +++ b/banyand/observability/meter_prom.go @@ -0,0 +1,31 @@ +//go:build prometheus +// +build prometheus + +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package observability + +import ( + "github.com/apache/skywalking-banyandb/pkg/meter" + "github.com/apache/skywalking-banyandb/pkg/meter/prom" +) + +// NewMeterProvider returns a meter.Provider based on the given scope. +func NewMeterProvider(scope meter.Scope) meter.Provider { + return prom.NewProvider(scope) +} diff --git a/banyand/observability/metric.go b/banyand/observability/metric.go index 943023786..d34b7a5d5 100644 --- a/banyand/observability/metric.go +++ b/banyand/observability/metric.go @@ -24,12 +24,16 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" "github.com/apache/skywalking-banyandb/pkg/run" ) var ( _ run.Service = (*metricService)(nil) _ run.Config = (*metricService)(nil) + + // RootScope is the root scope for all metrics. + RootScope = meter.NewHierarchicalScope("banyandb", "_") ) // NewMetricService returns a metric service. diff --git a/banyand/observability/type.go b/banyand/observability/type.go index d6e4b0995..b400c168a 100644 --- a/banyand/observability/type.go +++ b/banyand/observability/type.go @@ -21,14 +21,3 @@ package observability import "errors" var errNoAddr = errors.New("no address") - -// Statistics represents a sample of a module. -type Statistics struct { - MemBytes int64 - MaxMemBytes int64 -} - -// Observable allows sampling. -type Observable interface { - Stats() Statistics -} diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 1357e4582..53f29f47e 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -35,7 +35,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/kv" - "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" @@ -161,7 +160,7 @@ func (b *block) options(ctx context.Context) { Logger: b.l.Named(componentSecondInvertedIdx), BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec, } - lsmMemSize := bufferSize / 8 + lsmMemSize := bufferSize / 4 if lsmMemSize < defaultKVMemorySize { lsmMemSize = defaultKVMemorySize } @@ -210,7 +209,7 @@ func (b *block) openBuffer() (err error) { if b.buffer != nil { return nil } - if b.buffer, err = NewBuffer(b.l, int(b.openOpts.bufferSize/defaultNumBufferShards), + if b.buffer, err = NewBuffer(b.l, b.position, int(b.openOpts.bufferSize/defaultNumBufferShards), defaultWriteConcurrency, defaultNumBufferShards, b.flush); err != nil { return err } @@ -358,21 +357,6 @@ func (b *block) String() string { return fmt.Sprintf("BlockID-%s-%s", b.segSuffix, b.suffix) } -func (b *block) stats() (names []string, stats []observability.Statistics) { - if b.Closed() { - stats = make([]observability.Statistics, 0) - return - } - bnn, bss := b.buffer.Stats() - if bnn != nil { - names = append(names, bnn...) - stats = append(stats, bss...) - } - names = append(names, componentSecondInvertedIdx, componentSecondLSMIdx) - stats = append(stats, b.invertedIndex.Stats(), b.lsmIndex.Stats()) - return names, stats -} - func (b *block) Get(key []byte, ts uint64) ([]byte, error) { if v, ok := b.buffer.Read(key, time.Unix(0, int64(ts))); ok { return v, nil diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go index 75ba8e967..001501df5 100644 --- a/banyand/tsdb/buffer.go +++ b/banyand/tsdb/buffer.go @@ -26,9 +26,11 @@ import ( "github.com/dgraph-io/badger/v3/skl" "github.com/dgraph-io/badger/v3/y" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -37,6 +39,21 @@ const ( nodeAlign = int(unsafe.Sizeof(uint64(0))) - 1 ) +var ( + bufferMeterProvider meter.Provider + maxBytes meter.Gauge + immutableBytes meter.Gauge + mutableBytes meter.Gauge +) + +func init() { + bufferMeterProvider = observability.NewMeterProvider(meterTSDB.SubScope("buffer")) + labelNames := append(common.LabelNames(), "bucket") + maxBytes = bufferMeterProvider.Gauge("max_bytes", labelNames...) + mutableBytes = bufferMeterProvider.Gauge("mutable_bytes", labelNames...) + immutableBytes = bufferMeterProvider.Gauge("immutable_bytes", labelNames...) +} + type operation struct { key []byte value []byte @@ -57,6 +74,7 @@ type bufferShardBucket struct { flushWaitGroup *sync.WaitGroup log *logger.Logger immutables []*skl.Skiplist + labelValues []string index int capacity int mutex sync.RWMutex @@ -75,7 +93,7 @@ type Buffer struct { } // NewBuffer creates a new Buffer instance with the given parameters. -func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, onFlushFn onFlush) (*Buffer, error) { +func NewBuffer(log *logger.Logger, position common.Position, flushSize, writeConcurrency, numShards int, onFlushFn onFlush) (*Buffer, error) { buckets := make([]bufferShardBucket, numShards) buffer := &Buffer{ buckets: buckets, @@ -96,8 +114,10 @@ func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, o writeWaitGroup: &buffer.writeWaitGroup, flushWaitGroup: &buffer.flushWaitGroup, log: buffer.log.Named(fmt.Sprintf("shard-%d", i)), + labelValues: append(position.LabelValues(), fmt.Sprintf("%d", i)), } buckets[i].start(onFlushFn) + maxBytes.Set(float64(flushSize), buckets[i].labelValues...) } return buffer, nil } @@ -162,36 +182,6 @@ func (b *Buffer) Close() error { return nil } -// Stats returns the statistics for the buffer. -func (b *Buffer) Stats() ([]string, []observability.Statistics) { - if b == nil || !b.entryCloser.AddRunning() { - return nil, nil - } - names := make([]string, b.numShards) - stats := make([]observability.Statistics, b.numShards) - size := func(bucket *bufferShardBucket) (size int64, maxSize int64) { - ll, deferFn := bucket.getAll() - defer deferFn() - for _, l := range ll { - if l == nil { - continue - } - size += l.MemSize() - maxSize += int64(bucket.capacity) - } - return - } - for i := 0; i < b.numShards; i++ { - names[i] = fmt.Sprintf("buffer-%d", i) - size, maxSize := size(&b.buckets[i]) - stats[i] = observability.Statistics{ - MemBytes: size, - MaxMemBytes: maxSize, - } - } - return names, stats -} - func (b *Buffer) getShardIndex(key []byte) uint64 { return convert.Hash(key) % uint64(b.numShards) } @@ -219,14 +209,20 @@ func (bsb *bufferShardBucket) start(onFlushFn onFlush) { defer bsb.flushWaitGroup.Done() for event := range bsb.flushCh { oldSkipList := event.data - if err := onFlushFn(bsb.index, oldSkipList); err != nil { - bsb.log.Err(err).Msg("flushing immutable buffer failed") - continue + memSize := oldSkipList.MemSize() + for { + if err := onFlushFn(bsb.index, oldSkipList); err != nil { + bsb.log.Err(err).Msg("flushing immutable buffer failed. Retrying...") + time.Sleep(time.Second) + continue + } + break } bsb.mutex.Lock() bsb.immutables = bsb.immutables[1:] - oldSkipList.DecrRef() bsb.mutex.Unlock() + oldSkipList.DecrRef() + immutableBytes.Add(float64(-memSize), bsb.labelValues...) } }() go func() { @@ -236,22 +232,33 @@ func (bsb *bufferShardBucket) start(onFlushFn onFlush) { k := y.KeyWithTs(op.key, op.epoch) v := y.ValueStruct{Value: op.value} volume += len(k) + int(v.EncodedSize()) + skl.MaxNodeSize + nodeAlign - if volume >= bsb.capacity || bsb.mutable.MemSize() >= int64(bsb.capacity) { - select { - case bsb.flushCh <- flushEvent{data: bsb.mutable}: - default: - } + memSize := bsb.mutable.MemSize() + mutableBytes.Set(float64(memSize), bsb.labelValues...) + if volume >= bsb.capacity || memSize >= int64(bsb.capacity) { + bsb.triggerFlushing() volume = 0 - bsb.mutex.Lock() - bsb.swap() - bsb.mutex.Unlock() } bsb.mutable.Put(k, v) } }() } +func (bsb *bufferShardBucket) triggerFlushing() { + for { + select { + case bsb.flushCh <- flushEvent{data: bsb.mutable}: + bsb.mutex.Lock() + defer bsb.mutex.Unlock() + bsb.swap() + return + default: + } + time.Sleep(10 * time.Second) + } +} + func (bsb *bufferShardBucket) swap() { bsb.immutables = append(bsb.immutables, bsb.mutable) bsb.mutable = skl.NewSkiplist(int64(bsb.capacity)) + immutableBytes.Add(float64(bsb.mutable.MemSize()), bsb.labelValues...) } diff --git a/banyand/tsdb/buffer_test.go b/banyand/tsdb/buffer_test.go index 41e654e59..bb9483720 100644 --- a/banyand/tsdb/buffer_test.go +++ b/banyand/tsdb/buffer_test.go @@ -29,6 +29,7 @@ import ( . "github.com/onsi/gomega" "github.com/onsi/gomega/gleak" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test/flags" @@ -50,7 +51,7 @@ var _ = Describe("Buffer", func() { Context("Write and Read", func() { BeforeEach(func() { var err error - buffer, err = tsdb.NewBuffer(log, 1024*1024, 16, 4, func(shardIndex int, skl *skl.Skiplist) error { + buffer, err = tsdb.NewBuffer(log, common.Position{}, 1024*1024, 16, 4, func(shardIndex int, skl *skl.Skiplist) error { return nil }) Expect(err).ToNot(HaveOccurred()) @@ -117,7 +118,7 @@ var _ = Describe("Buffer", func() { }(ch) } - buffer, err := tsdb.NewBuffer(log, 1024, 16, numShards, onFlushFn) + buffer, err := tsdb.NewBuffer(log, common.Position{}, 1024, 16, numShards, onFlushFn) defer func() { _ = buffer.Close() }() diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go deleted file mode 100644 index bdd0d91f7..000000000 --- a/banyand/tsdb/metric.go +++ /dev/null @@ -1,106 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package tsdb - -import ( - "fmt" - "time" - - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/apache/skywalking-banyandb/banyand/observability" - "github.com/apache/skywalking-banyandb/pkg/logger" -) - -var ( - mtBytes *prometheus.GaugeVec - maxMtBytes *prometheus.GaugeVec -) - -func init() { - labels := []string{"module", "database", "shard", "component"} - mtBytes = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "banyand_memtables_bytes", - Help: "Memory table size in bytes", - }, - labels, - ) - maxMtBytes = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "banyand_memtables_max_bytes", - Help: "Maximum amount of memory table available in bytes", - }, - labels, - ) -} - -func (s *shard) stat(_ time.Time, _ *logger.Logger) (r bool) { - r = true - defer func() { - if r := recover(); r != nil { - err, ok := r.(error) - if !ok { - err = fmt.Errorf("%v", r) - } - s.l.Warn().Err(errors.WithStack(err)).Msg("recovered") - } - }() - seriesStat := s.seriesDatabase.Stats() - s.curry(mtBytes).WithLabelValues("series").Set(float64(seriesStat.MemBytes)) - s.curry(maxMtBytes).WithLabelValues("series").Set(float64(seriesStat.MaxMemBytes)) - segStats := observability.Statistics{} - blockStats := make(map[string]observability.Statistics) - for _, seg := range s.segmentController.segments() { - segStat := seg.Stats() - segStats.MaxMemBytes += segStat.MaxMemBytes - segStats.MemBytes += segStat.MemBytes - for _, b := range seg.blockController.blocks() { - if b.Closed() { - continue - } - names, bss := b.stats() - for i, bs := range bss { - bsc, ok := blockStats[names[i]] - if ok { - bsc.MaxMemBytes += bs.MaxMemBytes - bsc.MemBytes += bs.MemBytes - } else { - blockStats[names[i]] = bs - } - } - } - } - s.curry(mtBytes).WithLabelValues("global-index").Set(float64(segStats.MemBytes)) - s.curry(maxMtBytes).WithLabelValues("global-index").Set(float64(segStats.MaxMemBytes)) - for name, bs := range blockStats { - s.curry(mtBytes).WithLabelValues(name).Set(float64(bs.MemBytes)) - s.curry(maxMtBytes).WithLabelValues(name).Set(float64(bs.MaxMemBytes)) - } - return -} - -func (s *shard) curry(gv *prometheus.GaugeVec) *prometheus.GaugeVec { - return gv.MustCurryWith(prometheus.Labels{ - "module": s.position.Module, - "database": s.position.Database, - "shard": s.position.Shard, - }) -} diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go index 952fd3c3e..15aebb9b8 100644 --- a/banyand/tsdb/scope.go +++ b/banyand/tsdb/scope.go @@ -21,7 +21,6 @@ import ( "context" "github.com/apache/skywalking-banyandb/api/common" - "github.com/apache/skywalking-banyandb/banyand/observability" ) var _ Shard = (*scopedShard)(nil) @@ -74,10 +73,6 @@ type scopedSeriesDatabase struct { scope Entry } -func (sdd *scopedSeriesDatabase) Stats() observability.Statistics { - return sdd.delegated.Stats() -} - func (sdd *scopedSeriesDatabase) Close() error { return nil } diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go index e0cf74c28..61122f217 100644 --- a/banyand/tsdb/segment.go +++ b/banyand/tsdb/segment.go @@ -26,7 +26,6 @@ import ( "time" "github.com/apache/skywalking-banyandb/banyand/kv" - "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -136,10 +135,3 @@ func (s *segment) delete(ctx context.Context) error { func (s *segment) String() string { return "SegID-" + s.suffix } - -func (s *segment) Stats() observability.Statistics { - if s.globalIndex == nil { - return observability.Statistics{} - } - return s.globalIndex.Stats() -} diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index 3fc996c41..d715c0a34 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -34,7 +34,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/kv" - "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" @@ -248,7 +247,6 @@ func prepend(src []byte, entry []byte) []byte { // SeriesDatabase allows retrieving series. type SeriesDatabase interface { - observability.Observable io.Closer GetByID(id common.SeriesID) (Series, error) Get(key []byte, entityValues EntityValues) (Series, error) @@ -489,10 +487,6 @@ func (s *seriesDB) context() context.Context { return context.WithValue(context.Background(), logger.ContextKey, s.l) } -func (s *seriesDB) Stats() observability.Statistics { - return s.seriesMetadata.Stats() -} - func (s *seriesDB) Close() error { return s.seriesMetadata.Close() } diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go index 2bd94a7d6..3f561d694 100644 --- a/banyand/tsdb/shard.go +++ b/banyand/tsdb/shard.go @@ -25,7 +25,6 @@ import ( "time" "github.com/pkg/errors" - "github.com/robfig/cron/v3" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" @@ -108,9 +107,6 @@ func OpenShard(ctx context.Context, id common.ShardID, if position != nil { s.position = position.(common.Position) } - if err := scheduler.Register("stat", cron.Descriptor, "@every 5s", s.stat); err != nil { - return nil, err - } retentionTask := newRetentionTask(s.segmentController, ttl) if err := scheduler.Register("retention", retentionTask.option, retentionTask.expr, retentionTask.run); err != nil { return nil, err diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index 65d83d474..daa7b4a96 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -36,6 +36,7 @@ import ( "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -64,7 +65,9 @@ var ( errInvalidShardID = errors.New("invalid shard id") errOpenDatabase = errors.New("fails to open the database") - optionsKey = contextOptionsKey{} + optionsKey = contextOptionsKey{} + meterStorage = observability.RootScope.SubScope("storage") + meterTSDB = meterStorage.SubScope("tsdb") ) type contextOptionsKey struct{} diff --git a/docs/README.md b/docs/README.md index cd6c79d37..9b759f879 100644 --- a/docs/README.md +++ b/docs/README.md @@ -8,6 +8,7 @@ Here you can learn all you need to know about BanyanDB. - **Clients**. Some native clients to access Banyand. - **Schema**. Pivotal database native resources. - **CRUD Operations**. To create, read, update, and delete data points or entities on resources in the schema. +- **Observability**. Learn how to effectively monitor, diagnose and optimize Banyand. You might also find these links interesting: diff --git a/docs/menu.yml b/docs/menu.yml index bc2a9b39e..a586a3073 100644 --- a/docs/menu.yml +++ b/docs/menu.yml @@ -22,6 +22,8 @@ catalog: path: "/installation" - name: "Clients" path: "/clients" + - name: "Observability" + path: "/observability" - name: "Concept" catalog: - name: "Data Model" diff --git a/docs/observability.md b/docs/observability.md new file mode 100644 index 000000000..af0c38c35 --- /dev/null +++ b/docs/observability.md @@ -0,0 +1,27 @@ +# Observability + +This document outlines the observability features of BanyanDB, which include metrics, profiling, and tracing. These features help monitor and understand the performance, behavior, and overall health of BanyanDB. + +## Metrics + +BanyanDB has built-in support for metrics collection through the use of build tags. The metrics provider can be enabled by specifying the build tag during the compilation process. + +Currently, there is only one supported metrics provider: `Prometheus`. To use Prometheus as the metrics client, include the `prometheus` build tag when building BanyanDB: + +`BUILD_TAGS=prometheus make -C banyand banyand-server` + +If no build tag is specified, the metrics server will not be started, and no metrics will be collected: + +`make -C banyand banyand-server` + +When the Prometheus metrics provider is enabled, the metrics server listens on port `2121`. This allows Prometheus to scrape metrics data from BanyanDB for monitoring and analysis. + +The Docker image is tagged as "prometheus" to facilitate cloud-native operations and simplify deployment on Kubernetes. This allows users to directly deploy the Docker image onto their Kubernetes cluster without having to rebuild it with the "prometheus" tag. + +## Profiling + +TODO: Add details about the profiling support in BanyanDB, such as how to enable profiling, available profiling tools, and how to analyze profiling data. + +## Tracing + +TODO: Add details about the tracing support in BanyanDB, such as how to enable tracing, available tracing tools, and how to analyze tracing data. diff --git a/pkg/index/index.go b/pkg/index/index.go index 1862231f4..95483d808 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -28,7 +28,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index/posting" ) @@ -205,7 +204,6 @@ type Searcher interface { // Store is an abstract of a index repository. type Store interface { - observability.Observable io.Closer Writer Searcher diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index ff4cd4d41..9574f023b 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -36,7 +36,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" @@ -122,10 +121,6 @@ func NewStore(opts StoreOpts) (index.Store, error) { return s, nil } -func (s *store) Stats() observability.Statistics { - return observability.Statistics{} -} - func (s *store) Close() error { s.closer.CloseThenWait() return s.writer.Close() diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go index 2737d9739..b800aaf2f 100644 --- a/pkg/index/lsm/lsm.go +++ b/pkg/index/lsm/lsm.go @@ -23,7 +23,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/kv" - "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -36,10 +35,6 @@ type store struct { l *logger.Logger } -func (s *store) Stats() observability.Statistics { - return s.lsm.Stats() -} - func (s *store) Close() error { return s.lsm.Close() } diff --git a/pkg/meter/meter.go b/pkg/meter/meter.go new file mode 100644 index 000000000..86fa1fa42 --- /dev/null +++ b/pkg/meter/meter.go @@ -0,0 +1,101 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package meter provides a simple meter system for metrics. The metrics are aggregated by the meter provider. +package meter + +type ( + // Buckets is a slice of bucket boundaries. + Buckets []float64 + + // LabelPairs is a map of label names to label values, which is used to identify a metric. + LabelPairs map[string]string +) + +// Merge merges the given label pairs with the current label pairs. +func (p LabelPairs) Merge(other LabelPairs) LabelPairs { + result := make(LabelPairs, len(p)+len(other)) + for k, v := range p { + result[k] = v + } + for k, v := range other { + result[k] = v + } + return result +} + +// Provider is the interface for a metrics provider, which is responsible for creating metrics. +type Provider interface { + Counter(name string, labelNames ...string) Counter + Gauge(name string, labelNames ...string) Gauge + Histogram(name string, buckets Buckets, labelNames ...string) Histogram +} + +// Scope is a namespace wrapper for metrics. +type Scope interface { + ConstLabels(labels LabelPairs) Scope + SubScope(name string) Scope + GetNamespace() string + GetLabels() LabelPairs +} + +// Counter is a metric that represents a single numerical value that only ever goes up. +type Counter interface { + Inc(delta float64, labelValues ...string) +} + +// Gauge is a metric that represents a single numerical value that can arbitrarily go up and down. +type Gauge interface { + Set(value float64, labelValues ...string) + Add(delta float64, labelValues ...string) +} + +// Histogram is a metric that represents the statistical distribution of a set of values. +type Histogram interface { + Observe(value float64, labelValues ...string) +} + +type noopCounter struct{} + +func (noopCounter) Inc(_ float64, _ ...string) {} + +type noopGauge struct{} + +func (noopGauge) Set(_ float64, _ ...string) {} +func (noopGauge) Add(_ float64, _ ...string) {} + +type noopHistogram struct{} + +func (noopHistogram) Observe(_ float64, _ ...string) {} + +// NoopProvider is a no-op implementation of the Provider interface. +type NoopProvider struct{} + +// Counter returns a no-op implementation of the Counter interface. +func (NoopProvider) Counter(_ string, _ ...string) Counter { + return noopCounter{} +} + +// Gauge returns a no-op implementation of the Gauge interface. +func (NoopProvider) Gauge(_ string, _ ...string) Gauge { + return noopGauge{} +} + +// Histogram returns a no-op implementation of the Histogram interface. +func (NoopProvider) Histogram(_ string, _ Buckets, _ ...string) Histogram { + return noopHistogram{} +} diff --git a/pkg/meter/prom/instruments.go b/pkg/meter/prom/instruments.go new file mode 100644 index 000000000..6ea434c44 --- /dev/null +++ b/pkg/meter/prom/instruments.go @@ -0,0 +1,49 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package prom provides a prometheus implementation for the meter system. +package prom + +import "github.com/prometheus/client_golang/prometheus" + +type counter struct { + counter *prometheus.CounterVec +} + +func (c *counter) Inc(delta float64, labelValues ...string) { + c.counter.WithLabelValues(labelValues...).Add(delta) +} + +type gauge struct { + gauge *prometheus.GaugeVec +} + +func (g *gauge) Set(value float64, labelValues ...string) { + g.gauge.WithLabelValues(labelValues...).Set(value) +} + +func (g *gauge) Add(delta float64, labelValues ...string) { + g.gauge.WithLabelValues(labelValues...).Add(delta) +} + +type histogram struct { + histogram *prometheus.HistogramVec +} + +func (h *histogram) Observe(value float64, labelValues ...string) { + h.histogram.WithLabelValues(labelValues...).Observe(value) +} diff --git a/pkg/meter/prom/prom.go b/pkg/meter/prom/prom.go new file mode 100644 index 000000000..50583732f --- /dev/null +++ b/pkg/meter/prom/prom.go @@ -0,0 +1,80 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package prom + +import ( + "unsafe" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/apache/skywalking-banyandb/pkg/meter" +) + +// Provider is a prometheus provider. +type provider struct { + scope meter.Scope +} + +// NewProvider creates a new prometheus provider with given meter.Scope. +func NewProvider(scope meter.Scope) meter.Provider { + return &provider{ + scope: scope, + } +} + +// Counter returns a prometheus counter. +func (p *provider) Counter(name string, labels ...string) meter.Counter { + return &counter{ + counter: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: name, + Help: name, + ConstLabels: convertLabels(p.scope.GetLabels()), + }, labels), + } +} + +// Gauge returns a prometheus gauge. +func (p *provider) Gauge(name string, labels ...string) meter.Gauge { + return &gauge{ + gauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: name, + Help: name, + ConstLabels: convertLabels(p.scope.GetLabels()), + }, labels), + } +} + +// Histogram returns a prometheus histogram. +func (p *provider) Histogram(name string, buckets meter.Buckets, labels ...string) meter.Histogram { + return &histogram{ + histogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: name, + Help: name, + ConstLabels: convertLabels(p.scope.GetLabels()), + Buckets: buckets, + }, labels), + } +} + +// convertLabels converts a map of labels to a prometheus.Labels. +func convertLabels(labels meter.LabelPairs) prometheus.Labels { + if labels == nil { + return nil + } + return *(*prometheus.Labels)(unsafe.Pointer(&labels)) +} diff --git a/pkg/meter/scope.go b/pkg/meter/scope.go new file mode 100644 index 000000000..4a3d7720e --- /dev/null +++ b/pkg/meter/scope.go @@ -0,0 +1,78 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package meter + +import ( + "sync" +) + +// HierarchicalScope is a Scope implementation that supports hierarchical scopes. +type HierarchicalScope struct { + parent *HierarchicalScope + labels LabelPairs + sep string + name string + mu sync.RWMutex +} + +// NewHierarchicalScope creates a new hierarchical scope. +func NewHierarchicalScope(name, sep string) Scope { + return &HierarchicalScope{sep: sep, name: name} +} + +// ConstLabels merges the given labels with the labels of the parent scope. +func (s *HierarchicalScope) ConstLabels(labels LabelPairs) Scope { + s.mu.Lock() + defer s.mu.Unlock() + if s.parent != nil { + labels = s.parent.GetLabels().Merge(labels) + } + s.labels = labels + return s +} + +// SubScope creates a new sub-scope with the given name. +func (s *HierarchicalScope) SubScope(name string) Scope { + s.mu.Lock() + defer s.mu.Unlock() + + return &HierarchicalScope{ + parent: s, + name: name, + sep: s.sep, + } +} + +// GetNamespace returns the namespace of this scope. +func (s *HierarchicalScope) GetNamespace() string { + s.mu.RLock() + defer s.mu.RUnlock() + + if s.parent == nil { + return s.name + } + return s.parent.GetNamespace() + s.sep + s.name +} + +// GetLabels returns the labels of this scope. +func (s *HierarchicalScope) GetLabels() LabelPairs { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.labels +} diff --git a/pkg/meter/scope_test.go b/pkg/meter/scope_test.go new file mode 100644 index 000000000..bbeb4efe4 --- /dev/null +++ b/pkg/meter/scope_test.go @@ -0,0 +1,50 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package meter_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/pkg/meter" +) + +// TestHierarchicalScope tests the hierarchical scope. +func TestHierarchicalScope(t *testing.T) { + sep := "." + scope := meter.NewHierarchicalScope("test", sep) + scope = scope.ConstLabels(meter.LabelPairs{ + "env": "dev", + "version": "1.0", + }) + scope = scope.SubScope("child") + scope = scope.ConstLabels(meter.LabelPairs{ + "version": "2.0", + }) + scope = scope.SubScope("grandchild") + scope = scope.ConstLabels(meter.LabelPairs{ + "version": "3.0", + }) + hs, ok := scope.(*meter.HierarchicalScope) + require.True(t, ok) + assert.Equal(t, "test.child.grandchild", hs.GetNamespace()) + assert.Equal(t, "dev", hs.GetLabels()["env"]) + assert.Equal(t, "3.0", hs.GetLabels()["version"]) +}