Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a simple meter system #264

Merged
merged 1 commit into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Release Notes.
- [UI] Add YAML editor for inputting query criteria.
- Refactor TopN to support `NULL` group while keeping seriesID from the source measure.
- Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only provides SST.
- Add a meter system to control the internal metrics.

### Chores

Expand Down
21 changes: 8 additions & 13 deletions api/common/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"context"
"fmt"

"github.com/prometheus/client_golang/prometheus"

"github.com/apache/skywalking-banyandb/pkg/convert"
)

Expand Down Expand Up @@ -55,19 +53,16 @@ type Position struct {
Shard string
Segment string
Block string
KV string
}

// Labels converts Position to Prom Labels.
func (p Position) Labels() prometheus.Labels {
return prometheus.Labels{
"module": p.Module,
"database": p.Database,
"shard": p.Shard,
"seg": p.Segment,
"block": p.Block,
"kv": p.KV,
}
// LabelNames returns the label names of Position.
func LabelNames() []string {
return []string{"module", "database", "shard", "seg", "block"}
}

// LabelValues returns the label values of Position.
func (p Position) LabelValues() []string {
return []string{p.Module, p.Database, p.Shard, p.Segment, p.Block}
}

// SetPosition sets a position returned from fn to attach it to ctx, then return a new context.
Expand Down
2 changes: 1 addition & 1 deletion banyand/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ FROM base AS builder

RUN --mount=target=. \
--mount=type=cache,target=/root/.cache/go-build \
BUILD_DIR=/out make -C banyand all
BUILD_DIR=/out BUILD_TAGS=prometheus make -C banyand all

FROM alpine:edge AS certs
RUN apk add --no-cache ca-certificates && update-ca-certificates
Expand Down
15 changes: 10 additions & 5 deletions banyand/internal/cmd/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/signal"
"github.com/apache/skywalking-banyandb/pkg/version"
Expand Down Expand Up @@ -77,8 +78,7 @@ func newStandaloneCmd() *cobra.Command {
metricSvc := observability.NewMetricService()
httpServer := http.NewService()

// Meta the run Group units.
g.Register(
units := []run.Unit{
new(signal.Handler),
repo,
pipeline,
Expand All @@ -87,10 +87,15 @@ func newStandaloneCmd() *cobra.Command {
streamSvc,
q,
tcp,
metricSvc,
profSvc,
httpServer,
)
profSvc,
}
_, noMetricProvider := observability.NewMeterProvider(observability.RootScope).(meter.NoopProvider)
if !noMetricProvider {
units = append(units, metricSvc)
}
// Meta the run Group units.
g.Register(units...)
logging := logger.Logging{}
standaloneCmd := &cobra.Command{
Use: "standalone",
Expand Down
17 changes: 0 additions & 17 deletions banyand/kv/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"

"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
Expand All @@ -54,25 +53,13 @@ func (b *badgerTSS) Handover(skl *skl.Skiplist) error {
return b.db.HandoverIterator(skl.NewUniIterator(false))
}

func (b *badgerTSS) Stats() (s observability.Statistics) {
return badgerStats(b.db)
}

func (b *badgerTSS) Close() error {
if b.db != nil && !b.db.IsClosed() {
return b.db.Close()
}
return nil
}

func badgerStats(db *badger.DB) (s observability.Statistics) {
stat := db.Stats()
return observability.Statistics{
MemBytes: stat.MemBytes,
MaxMemBytes: db.Opts().MemTableSize,
}
}

type mergedIter struct {
delegated Iterator
data []byte
Expand Down Expand Up @@ -128,10 +115,6 @@ type badgerDB struct {
dbOpts badger.Options
}

func (b *badgerDB) Stats() observability.Statistics {
return badgerStats(b.db)
}

func (b *badgerDB) Scan(prefix, seekKey []byte, opt ScanOpts, f ScanFunc) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = opt.PrefetchSize
Expand Down
4 changes: 0 additions & 4 deletions banyand/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/dgraph-io/badger/v3/skl"
"github.com/pkg/errors"

"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
Expand Down Expand Up @@ -71,7 +70,6 @@ type Reader interface {

// Store is a common kv storage with auto-generated key.
type Store interface {
observability.Observable
io.Closer
writer
Reader
Expand All @@ -85,7 +83,6 @@ type TimeSeriesReader interface {

// TimeSeriesStore is time series storage.
type TimeSeriesStore interface {
observability.Observable
io.Closer
Handover(skl *skl.Skiplist) error
TimeSeriesReader
Expand Down Expand Up @@ -163,7 +160,6 @@ type Iterable interface {

// IndexStore allows writing and reading index format data.
type IndexStore interface {
observability.Observable
Iterable
Reader
Close() error
Expand Down
30 changes: 30 additions & 0 deletions banyand/observability/meter_noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//go:build !prometheus
// +build !prometheus

// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package observability

import (
"github.com/apache/skywalking-banyandb/pkg/meter"
)

// NewMeterProvider returns a meter.Provider based on the given scope.
func NewMeterProvider(_ meter.Scope) meter.Provider {
return meter.NoopProvider{}
}
31 changes: 31 additions & 0 deletions banyand/observability/meter_prom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//go:build prometheus
// +build prometheus

// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package observability

import (
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/meter/prom"
)

// NewMeterProvider returns a meter.Provider based on the given scope.
func NewMeterProvider(scope meter.Scope) meter.Provider {
return prom.NewProvider(scope)
}
4 changes: 4 additions & 0 deletions banyand/observability/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
)

var (
_ run.Service = (*metricService)(nil)
_ run.Config = (*metricService)(nil)

// RootScope is the root scope for all metrics.
RootScope = meter.NewHierarchicalScope("banyandb", "_")
)

// NewMetricService returns a metric service.
Expand Down
11 changes: 0 additions & 11 deletions banyand/observability/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,3 @@ package observability
import "errors"

var errNoAddr = errors.New("no address")

// Statistics represents a sample of a module.
type Statistics struct {
MemBytes int64
MaxMemBytes int64
}

// Observable allows sampling.
type Observable interface {
Stats() Statistics
}
20 changes: 2 additions & 18 deletions banyand/tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
Expand Down Expand Up @@ -161,7 +160,7 @@ func (b *block) options(ctx context.Context) {
Logger: b.l.Named(componentSecondInvertedIdx),
BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
}
lsmMemSize := bufferSize / 8
lsmMemSize := bufferSize / 4
if lsmMemSize < defaultKVMemorySize {
lsmMemSize = defaultKVMemorySize
}
Expand Down Expand Up @@ -210,7 +209,7 @@ func (b *block) openBuffer() (err error) {
if b.buffer != nil {
return nil
}
if b.buffer, err = NewBuffer(b.l, int(b.openOpts.bufferSize/defaultNumBufferShards),
if b.buffer, err = NewBuffer(b.l, b.position, int(b.openOpts.bufferSize/defaultNumBufferShards),
defaultWriteConcurrency, defaultNumBufferShards, b.flush); err != nil {
return err
}
Expand Down Expand Up @@ -358,21 +357,6 @@ func (b *block) String() string {
return fmt.Sprintf("BlockID-%s-%s", b.segSuffix, b.suffix)
}

func (b *block) stats() (names []string, stats []observability.Statistics) {
if b.Closed() {
stats = make([]observability.Statistics, 0)
return
}
bnn, bss := b.buffer.Stats()
if bnn != nil {
names = append(names, bnn...)
stats = append(stats, bss...)
}
names = append(names, componentSecondInvertedIdx, componentSecondLSMIdx)
stats = append(stats, b.invertedIndex.Stats(), b.lsmIndex.Stats())
return names, stats
}

func (b *block) Get(key []byte, ts uint64) ([]byte, error) {
if v, ok := b.buffer.Read(key, time.Unix(0, int64(ts))); ok {
return v, nil
Expand Down
Loading