Skip to content

Commit

Permalink
Add several metrics to measure storage sub system. (#266)
Browse files Browse the repository at this point in the history
* Add several metrics to measure the storage sub-system.

Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hanahmily committed Apr 18, 2023
1 parent 09925ff commit 49132fa
Show file tree
Hide file tree
Showing 42 changed files with 1,938 additions and 242 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Release Notes.
- Refactor TopN to support `NULL` group while keeping seriesID from the source measure.
- Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only provides SST.
- Add a meter system to control the internal metrics.
- Add multiple metrics for measuring the storage subsystem.

### Chores

Expand Down
27 changes: 23 additions & 4 deletions api/common/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func (s SeriesID) Marshal() []byte {
return convert.Uint64ToBytes(uint64(s))
}

// PositionKey is a context key to store the module position.
var PositionKey = contextPositionKey{}
// positionKey is a context key to store the module position.
var positionKey = contextPositionKey{}

type contextPositionKey struct{}

Expand All @@ -60,21 +60,40 @@ func LabelNames() []string {
return []string{"module", "database", "shard", "seg", "block"}
}

// ShardLabelNames returns the label names of Position. It is used for shard level metrics.
func ShardLabelNames() []string {
return []string{"module", "database", "shard"}
}

// LabelValues returns the label values of Position.
func (p Position) LabelValues() []string {
return []string{p.Module, p.Database, p.Shard, p.Segment, p.Block}
}

// ShardLabelValues returns the label values of Position. It is used for shard level metrics.
func (p Position) ShardLabelValues() []string {
return []string{p.Module, p.Database, p.Shard}
}

// SetPosition sets a position returned from fn to attach it to ctx, then return a new context.
func SetPosition(ctx context.Context, fn func(p Position) Position) context.Context {
val := ctx.Value(PositionKey)
val := ctx.Value(positionKey)
var p Position
if val == nil {
p = Position{}
} else {
p = val.(Position)
}
return context.WithValue(ctx, PositionKey, fn(p))
return context.WithValue(ctx, positionKey, fn(p))
}

// GetPosition returns the position from ctx.
func GetPosition(ctx context.Context) Position {
val := ctx.Value(positionKey)
if val == nil {
return Position{}
}
return val.(Position)
}

// Error wraps a error msg.
Expand Down
4 changes: 1 addition & 3 deletions banyand/internal/cmd/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/signal"
"github.com/apache/skywalking-banyandb/pkg/version"
Expand Down Expand Up @@ -90,8 +89,7 @@ func newStandaloneCmd() *cobra.Command {
httpServer,
profSvc,
}
_, noMetricProvider := observability.NewMeterProvider(observability.RootScope).(meter.NoopProvider)
if !noMetricProvider {
if metricSvc != nil {
units = append(units, metricSvc)
}
// Meta the run Group units.
Expand Down
10 changes: 10 additions & 0 deletions banyand/kv/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (b *badgerTSS) Close() error {
return nil
}

func (b *badgerTSS) SizeOnDisk() int64 {
lsmSize, vlogSize := b.db.Size()
return lsmSize + vlogSize
}

type mergedIter struct {
delegated Iterator
data []byte
Expand Down Expand Up @@ -145,6 +150,11 @@ func (b *badgerDB) Scan(prefix, seekKey []byte, opt ScanOpts, f ScanFunc) error
return nil
}

func (b *badgerDB) SizeOnDisk() int64 {
lsmSize, vlogSize := b.db.Size()
return lsmSize + vlogSize
}

var _ Iterator = (*iterator)(nil)

type iterator struct {
Expand Down
3 changes: 3 additions & 0 deletions banyand/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Store interface {
io.Closer
writer
Reader
SizeOnDisk() int64
}

// TimeSeriesReader allows retrieving data from a time-series storage.
Expand All @@ -86,6 +87,7 @@ type TimeSeriesStore interface {
io.Closer
Handover(skl *skl.Skiplist) error
TimeSeriesReader
SizeOnDisk() int64
}

// TimeSeriesOptions sets an options for creating a TimeSeriesStore.
Expand Down Expand Up @@ -163,6 +165,7 @@ type IndexStore interface {
Iterable
Reader
Close() error
SizeOnDisk() int64
}

// OpenTimeSeriesStore creates a new TimeSeriesStore.
Expand Down
33 changes: 30 additions & 3 deletions banyand/liaison/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ package grpc
import (
"context"
"net"
"runtime/debug"
"time"

grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
"github.com/pkg/errors"
grpclib "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"github.com/apache/skywalking-banyandb/api/event"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
Expand All @@ -37,6 +41,7 @@ import (
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
Expand Down Expand Up @@ -184,9 +189,31 @@ func (s *server) Serve() run.StopNotify {
if s.tls {
opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
}
grpcPanicRecoveryHandler := func(p any) (err error) {
s.log.Error().Interface("panic", p).Str("stack", string(debug.Stack())).Msg("recovered from panic")

return status.Errorf(codes.Internal, "%s", p)
}

unaryMetrics, streamMetrics := observability.MetricsServerInterceptor()
streamChain := []grpclib.StreamServerInterceptor{
grpc_validator.StreamServerInterceptor(),
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
}
if streamMetrics != nil {
streamChain = append(streamChain, streamMetrics)
}
unaryChain := []grpclib.UnaryServerInterceptor{
grpc_validator.UnaryServerInterceptor(),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
}
if unaryMetrics != nil {
unaryChain = append(unaryChain, unaryMetrics)
}

opts = append(opts, grpclib.MaxRecvMsgSize(s.maxRecvMsgSize),
grpclib.UnaryInterceptor(grpc_validator.UnaryServerInterceptor()),
grpclib.StreamInterceptor(grpc_validator.StreamServerInterceptor()),
grpclib.ChainUnaryInterceptor(unaryChain...),
grpclib.ChainStreamInterceptor(streamChain...),
)
s.ser = grpclib.NewServer(opts...)

Expand Down
8 changes: 5 additions & 3 deletions banyand/measure/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,12 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
if opts.TTL, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.Ttl); err != nil {
return nil, err
}

return tsdb.OpenDatabase(
context.WithValue(context.Background(), common.PositionKey, common.Position{
Module: "measure",
Database: name,
common.SetPosition(context.Background(), func(p common.Position) common.Position {
p.Module = "measure"
p.Database = name
return p
}),
opts)
}
51 changes: 51 additions & 0 deletions banyand/observability/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package observability

import (
"sync"
)

// MetricsCollector is a global metrics collector.
var MetricsCollector = Collector{
getters: make(map[string]MetricsGetter),
}

// MetricsGetter is a function that collects metrics.
type MetricsGetter func()

// Collector is a metrics collector.
type Collector struct {
getters map[string]MetricsGetter
gMux sync.RWMutex
}

// Register registers a metrics getter.
func (c *Collector) Register(name string, getter MetricsGetter) {
c.gMux.Lock()
defer c.gMux.Unlock()
c.getters[name] = getter
}

func (c *Collector) collect() {
c.gMux.RLock()
defer c.gMux.RUnlock()
for _, getter := range c.getters {
getter()
}
}
14 changes: 14 additions & 0 deletions banyand/observability/meter_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,24 @@
package observability

import (
"google.golang.org/grpc"

"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
)

// NewMetricService returns a metric service.
func NewMetricService() run.Service {
MetricsCollector.collect()
return nil
}

// NewMeterProvider returns a meter.Provider based on the given scope.
func NewMeterProvider(_ meter.Scope) meter.Provider {
return meter.NoopProvider{}
}

// MetricsServerInterceptor returns a grpc.UnaryServerInterceptor and a grpc.StreamServerInterceptor.
func MetricsServerInterceptor() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) {
return nil, nil
}
Loading

0 comments on commit 49132fa

Please sign in to comment.