Skip to content

Commit

Permalink
Fix unstopped loop
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hanahmily committed Apr 6, 2023
1 parent d8f033d commit b10afd2
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 35 deletions.
4 changes: 2 additions & 2 deletions banyand/metadata/schema/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var _ = ginkgo.Describe("Utils", func() {
})

ginkgo.AfterEach(func() {
Eventually(gleak.Goroutines()).ShouldNot(gleak.HaveLeaked(goods))
Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
})

ginkgo.It("should be equal if nothing changed", func() {
Expand Down Expand Up @@ -188,7 +188,7 @@ var _ = ginkgo.Describe("Utils", func() {
})

ginkgo.AfterEach(func() {
Eventually(gleak.Goroutines()).ShouldNot(gleak.HaveLeaked(goods))
Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
})

ginkgo.It("should be equal if nothing changed", func() {
Expand Down
4 changes: 2 additions & 2 deletions banyand/metadata/schema/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import (
var (
// ErrGRPCResourceNotFound indicates the resource doesn't exist.
ErrGRPCResourceNotFound = statusGRPCResourceNotFound.Err()
// ErrClosed indicates the registry is closed.
ErrClosed = errors.New("metadata registry is closed")

statusGRPCInvalidArgument = status.New(codes.InvalidArgument, "banyandb: input is invalid")
statusGRPCResourceNotFound = status.New(codes.NotFound, "banyandb: resource not found")
statusGRPCAlreadyExists = status.New(codes.AlreadyExists, "banyandb: resource already exists")
errGRPCAlreadyExists = statusGRPCAlreadyExists.Err()
statusDataLoss = status.New(codes.DataLoss, "banyandb: resource corrupts.")
errGRPCDataLoss = statusDataLoss.Err()

errClosed = errors.New("metadata registry is closed")
)

// BadRequest creates a gRPC error with error details with type BadRequest,
Expand Down
10 changes: 5 additions & 5 deletions banyand/metadata/schema/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) {

func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.Message) error {
if !e.closer.AddRunning() {
return errClosed
return ErrClosed
}
defer e.closer.Done()
resp, err := e.client.Get(ctx, key)
Expand Down Expand Up @@ -229,7 +229,7 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.
// Otherwise, it will return ErrGRPCResourceNotFound.
func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) error {
if !e.closer.AddRunning() {
return errClosed
return ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
Expand Down Expand Up @@ -281,7 +281,7 @@ func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) erro
// Otherwise, it will return ErrGRPCAlreadyExists.
func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) error {
if !e.closer.AddRunning() {
return errClosed
return ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
Expand Down Expand Up @@ -314,7 +314,7 @@ func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) erro

func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix string, factory func() proto.Message) ([]proto.Message, error) {
if !e.closer.AddRunning() {
return nil, errClosed
return nil, ErrClosed
}
defer e.closer.Done()
resp, err := e.client.Get(ctx, prefix, clientv3.WithFromKey(), clientv3.WithRange(incrementLastByte(prefix)))
Expand Down Expand Up @@ -343,7 +343,7 @@ func listPrefixesForEntity(group, entityPrefix string) string {

func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata) (bool, error) {
if !e.closer.AddRunning() {
return false, errClosed
return false, ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
Expand Down
20 changes: 3 additions & 17 deletions banyand/tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,29 +284,15 @@ func (b *block) delegate(ctx context.Context) (blockDelegate, error) {
}

func (b *block) incRef() bool {
loop:
if b.Closed() {
return false
}
r := b.ref.Load()
if b.ref.CompareAndSwap(r, r+1) {
return true
}
runtime.Gosched()
goto loop
b.ref.Add(1)
return true
}

func (b *block) Done() {
loop:
r := b.ref.Load()
if r < 1 {
return
}
if b.ref.CompareAndSwap(r, r-1) {
return
}
runtime.Gosched()
goto loop
b.ref.Add(-1)
}

func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} {
Expand Down
27 changes: 19 additions & 8 deletions banyand/tsdb/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
index := b.getShardIndex(key)
epoch := uint64(ts.UnixNano())
for _, bk := range b.buckets[index].getAll() {
ll, deferFn := b.buckets[index].getAll()
defer deferFn()
for _, bk := range ll {
value := bk.Get(keyWithTS)
if value.Meta == 0 && value.Value == nil {
continue
Expand Down Expand Up @@ -167,16 +169,21 @@ func (b *Buffer) Stats() ([]string, []observability.Statistics) {
}
names := make([]string, b.numShards)
stats := make([]observability.Statistics, b.numShards)
for i := 0; i < b.numShards; i++ {
names[i] = fmt.Sprintf("buffer-%d", i)
var size, maxSize int64
for _, l := range b.buckets[i].getAll() {
size := func(bucket *bufferShardBucket) (size int64, maxSize int64) {
ll, deferFn := bucket.getAll()
defer deferFn()
for _, l := range ll {
if l == nil {
continue
}
size += l.MemSize()
maxSize += int64(b.buckets[i].capacity)
maxSize += int64(bucket.capacity)
}
return
}
for i := 0; i < b.numShards; i++ {
names[i] = fmt.Sprintf("buffer-%d", i)
size, maxSize := size(&b.buckets[i])
stats[i] = observability.Statistics{
MemBytes: size,
MaxMemBytes: maxSize,
Expand All @@ -189,7 +196,7 @@ func (b *Buffer) getShardIndex(key []byte) uint64 {
return convert.Hash(key) % uint64(b.numShards)
}

func (bsb *bufferShardBucket) getAll() []*skl.Skiplist {
func (bsb *bufferShardBucket) getAll() ([]*skl.Skiplist, func()) {
bsb.mutex.RLock()
defer bsb.mutex.RUnlock()
allList := make([]*skl.Skiplist, len(bsb.immutables)+1)
Expand All @@ -200,7 +207,11 @@ func (bsb *bufferShardBucket) getAll() []*skl.Skiplist {
allList[i+1] = bsb.immutables[last-i]
bsb.immutables[last-i].IncrRef()
}
return allList
return allList, func() {
for _, l := range allList {
l.DecrRef()
}
}
}

func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
Expand Down
17 changes: 17 additions & 0 deletions pkg/run/closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
)

var dummyCloserChan <-chan struct{}

// Closer can close a goroutine then wait for it to stop.
type Closer struct {
ctx context.Context
Expand All @@ -41,6 +43,9 @@ func NewCloser(initial int) *Closer {

// AddRunning adds a running task.
func (c *Closer) AddRunning() bool {
if c == nil {
return false
}
c.lock.RLock()
defer c.lock.RUnlock()
if c.closed {
Expand All @@ -52,16 +57,25 @@ func (c *Closer) AddRunning() bool {

// CloseNotify receives a signal from Close.
func (c *Closer) CloseNotify() <-chan struct{} {
if c == nil {
return dummyCloserChan
}
return c.ctx.Done()
}

// Done notifies that one task is done.
func (c *Closer) Done() {
if c == nil {
return
}
c.waiting.Done()
}

// CloseThenWait closes all tasks then waits till they are done.
func (c *Closer) CloseThenWait() {
if c == nil {
return
}
c.cancel()
c.lock.Lock()
c.closed = true
Expand All @@ -71,6 +85,9 @@ func (c *Closer) CloseThenWait() {

// Closed returns whether the Closer is closed.
func (c *Closer) Closed() bool {
if c == nil {
return true
}
c.lock.RLock()
defer c.lock.RUnlock()
return c.closed
Expand Down
3 changes: 2 additions & 1 deletion pkg/schema/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
Expand Down Expand Up @@ -196,7 +197,7 @@ func (sr *schemaRepo) Watcher() {
err = sr.deleteResource(evt.Metadata)
}
}
if err != nil {
if err != nil && !errors.Is(err, schema.ErrClosed) {
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...")
select {
case sr.eventCh <- evt:
Expand Down

0 comments on commit b10afd2

Please sign in to comment.