Skip to content

Commit

Permalink
Integrating Sharded Buffer into TSDB (#263)
Browse files Browse the repository at this point in the history
* Integrating Sharded Buffer into TSDB. The Sharded Buffer will replace Badger's memtable for data ingestion.
 Currently, Badger KV only provides SST.

* Sync e2e cases with the main repo.

* Fix buffer crashing

* Clear volume counter after flushing

* Fix unstopped loop

---------

Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hanahmily committed Apr 6, 2023
1 parent d1426ec commit 1f5a645
Show file tree
Hide file tree
Showing 31 changed files with 384 additions and 167 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Release Notes.
- Add TSDB concept document.
- [UI] Add YAML editor for inputting query criteria.
- Refactor TopN to support `NULL` group while keeping seriesID from the source measure.
- Add a sharded buffer to TSDB.
- Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only provides SST.

### Chores

Expand Down
2 changes: 1 addition & 1 deletion banyand/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ EXPOSE 2345

ENTRYPOINT ["air"]

FROM golang:1.19 AS base
FROM golang:1.20 AS base

ENV GOPATH "/go"
ENV GO111MODULE "on"
Expand Down
5 changes: 5 additions & 0 deletions banyand/kv/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/banyandb"
"github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"

"github.com/apache/skywalking-banyandb/banyand/observability"
Expand All @@ -49,6 +50,10 @@ type badgerTSS struct {
dbOpts badger.Options
}

func (b *badgerTSS) Handover(skl *skl.Skiplist) error {
return b.db.HandoverIterator(skl.NewUniIterator(false))
}

func (b *badgerTSS) Stats() (s observability.Statistics) {
return badgerStats(b.db)
}
Expand Down
13 changes: 3 additions & 10 deletions banyand/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/options"
"github.com/dgraph-io/badger/v3/skl"
"github.com/pkg/errors"

"github.com/apache/skywalking-banyandb/banyand/observability"
Expand Down Expand Up @@ -76,15 +77,6 @@ type Store interface {
Reader
}

// TimeSeriesWriter allows writing to a time-series storage.
type TimeSeriesWriter interface {
// Put a value with a timestamp/version
Put(key, val []byte, ts uint64) error
// PutAsync a value with a timestamp/version asynchronously.
// Injected "f" func will notice the result of value write.
PutAsync(key, val []byte, ts uint64, f func(error)) error
}

// TimeSeriesReader allows retrieving data from a time-series storage.
type TimeSeriesReader interface {
// Get a value by its key and timestamp/version
Expand All @@ -95,7 +87,7 @@ type TimeSeriesReader interface {
type TimeSeriesStore interface {
observability.Observable
io.Closer
TimeSeriesWriter
Handover(skl *skl.Skiplist) error
TimeSeriesReader
}

Expand Down Expand Up @@ -191,6 +183,7 @@ func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesS
if btss.dbOpts.MemTableSize < 8<<20 {
btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
}
btss.dbOpts = btss.dbOpts.WithInTable()
var err error
btss.db, err = badger.Open(btss.dbOpts)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions banyand/measure/measure_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte {
case *modelv1.FieldValue_Float:
return convert.Float64ToBytes(fieldValue.GetFloat().GetValue())
case *modelv1.FieldValue_Str:
return []byte(fieldValue.GetStr().Value)
return []byte(fieldValue.GetStr().GetValue())
case *modelv1.FieldValue_BinaryData:
return fieldValue.GetBinaryData()
return bytes.Clone(fieldValue.GetBinaryData())
}
return nil
}
4 changes: 2 additions & 2 deletions banyand/metadata/schema/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var _ = ginkgo.Describe("Utils", func() {
})

ginkgo.AfterEach(func() {
Eventually(gleak.Goroutines()).ShouldNot(gleak.HaveLeaked(goods))
Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
})

ginkgo.It("should be equal if nothing changed", func() {
Expand Down Expand Up @@ -188,7 +188,7 @@ var _ = ginkgo.Describe("Utils", func() {
})

ginkgo.AfterEach(func() {
Eventually(gleak.Goroutines()).ShouldNot(gleak.HaveLeaked(goods))
Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
})

ginkgo.It("should be equal if nothing changed", func() {
Expand Down
4 changes: 2 additions & 2 deletions banyand/metadata/schema/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import (
var (
// ErrGRPCResourceNotFound indicates the resource doesn't exist.
ErrGRPCResourceNotFound = statusGRPCResourceNotFound.Err()
// ErrClosed indicates the registry is closed.
ErrClosed = errors.New("metadata registry is closed")

statusGRPCInvalidArgument = status.New(codes.InvalidArgument, "banyandb: input is invalid")
statusGRPCResourceNotFound = status.New(codes.NotFound, "banyandb: resource not found")
statusGRPCAlreadyExists = status.New(codes.AlreadyExists, "banyandb: resource already exists")
errGRPCAlreadyExists = statusGRPCAlreadyExists.Err()
statusDataLoss = status.New(codes.DataLoss, "banyandb: resource corrupts.")
errGRPCDataLoss = statusDataLoss.Err()

errClosed = errors.New("metadata registry is closed")
)

// BadRequest creates a gRPC error with error details with type BadRequest,
Expand Down
10 changes: 5 additions & 5 deletions banyand/metadata/schema/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) {

func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.Message) error {
if !e.closer.AddRunning() {
return errClosed
return ErrClosed
}
defer e.closer.Done()
resp, err := e.client.Get(ctx, key)
Expand Down Expand Up @@ -229,7 +229,7 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.
// Otherwise, it will return ErrGRPCResourceNotFound.
func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) error {
if !e.closer.AddRunning() {
return errClosed
return ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
Expand Down Expand Up @@ -281,7 +281,7 @@ func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) erro
// Otherwise, it will return ErrGRPCAlreadyExists.
func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) error {
if !e.closer.AddRunning() {
return errClosed
return ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
Expand Down Expand Up @@ -314,7 +314,7 @@ func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) erro

func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix string, factory func() proto.Message) ([]proto.Message, error) {
if !e.closer.AddRunning() {
return nil, errClosed
return nil, ErrClosed
}
defer e.closer.Done()
resp, err := e.client.Get(ctx, prefix, clientv3.WithFromKey(), clientv3.WithRange(incrementLastByte(prefix)))
Expand Down Expand Up @@ -343,7 +343,7 @@ func listPrefixesForEntity(group, entityPrefix string) string {

func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata) (bool, error) {
if !e.closer.AddRunning() {
return false, errClosed
return false, ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
Expand Down
Loading

0 comments on commit 1f5a645

Please sign in to comment.