Skip to content

Commit 26128a7

Browse files
poonaimangalaman93
authored andcommitted
Add new API Subscribe to listen on update for the specified prefix (#771)
Signed-off-by: பாலாஜி <rbalajis25@gmail.com>
1 parent 364f6f2 commit 26128a7

File tree

7 files changed

+414
-3
lines changed

7 files changed

+414
-3
lines changed

badger/cmd/bank.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ var duration string
6868
var stopAll int32
6969
var mmap bool
7070
var checkStream bool
71+
var checkSubscriber bool
7172

7273
const keyPrefix = "account:"
7374

@@ -87,6 +88,9 @@ func init() {
8788
bankTest.Flags().BoolVarP(&checkStream, "check_stream", "s", false,
8889
"If true, the test will send transactions to another badger instance via the stream "+
8990
"interface in order to verify that all data is streamed correctly.")
91+
bankTest.Flags().BoolVarP(&checkSubscriber, "check_subscriber", "w", false,
92+
"If true, the test will send transactions to another badger instance via the subscriber "+
93+
"interface in order to verify that all the data is published correctly.")
9094

9195
bankDisect.Flags().IntVarP(&numPrevious, "previous", "p", 12,
9296
"Starting from the violation txn, how many previous versions to retrieve.")
@@ -347,6 +351,24 @@ func runTest(cmd *cobra.Command, args []string) error {
347351
defer db.Close()
348352

349353
var tmpDb *badger.DB
354+
var subscribeDB *badger.DB
355+
if checkSubscriber {
356+
dir, err := ioutil.TempDir("", "bank_subscribe")
357+
y.Check(err)
358+
359+
subscribeOpts := badger.DefaultOptions
360+
subscribeOpts.Dir = dir
361+
subscribeOpts.ValueDir = dir
362+
subscribeOpts.SyncWrites = false
363+
log.Printf("Opening subscribe DB with options: %+v\n", subscribeOpts)
364+
365+
subscribeDB, err = badger.Open(subscribeOpts)
366+
if err != nil {
367+
return err
368+
}
369+
defer subscribeDB.Close()
370+
}
371+
350372
if checkStream {
351373
dir, err := ioutil.TempDir("", "bank_stream")
352374
y.Check(err)
@@ -511,8 +533,39 @@ func runTest(cmd *cobra.Command, args []string) error {
511533
}))
512534
}
513535
}()
514-
wg.Wait()
515536

537+
ctx, cancel := context.WithCancel(context.Background())
538+
var subWg sync.WaitGroup
539+
if checkSubscriber {
540+
subWg.Add(1)
541+
go func() {
542+
defer subWg.Done()
543+
accountIDS := [][]byte{}
544+
for i := 0; i < numAccounts; i++ {
545+
accountIDS = append(accountIDS, key(i))
546+
}
547+
updater := func(kvs *pb.KVList) {
548+
batch := subscribeDB.NewWriteBatch()
549+
for _, kv := range kvs.GetKv() {
550+
y.Check(batch.Set(kv.Key, kv.Value, 0))
551+
}
552+
y.Check(batch.Flush())
553+
}
554+
db.Subscribe(ctx, updater, accountIDS[0], accountIDS[1:]...)
555+
}()
556+
}
557+
wg.Wait()
558+
cancel()
559+
subWg.Wait()
560+
y.Check(subscribeDB.View(func(txn *badger.Txn) error {
561+
_, err := seekTotal(txn)
562+
if err != nil {
563+
log.Printf("Error while calculating subscriber DB total: %v", err)
564+
} else {
565+
atomic.AddUint64(&reads, 1)
566+
}
567+
return nil
568+
}))
516569
if atomic.LoadInt32(&stopAll) == 0 {
517570
log.Println("Test OK")
518571
return nil

db.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package badger
1818

1919
import (
2020
"bytes"
21+
"context"
2122
"encoding/binary"
2223
"encoding/hex"
2324
"expvar"
@@ -32,6 +33,7 @@ import (
3233
"time"
3334

3435
"github.com/dgraph-io/badger/options"
36+
"github.com/dgraph-io/badger/pb"
3537
"github.com/dgraph-io/badger/skl"
3638
"github.com/dgraph-io/badger/table"
3739
"github.com/dgraph-io/badger/y"
@@ -54,8 +56,11 @@ type closers struct {
5456
memtable *y.Closer
5557
writes *y.Closer
5658
valueGC *y.Closer
59+
pub *y.Closer
5760
}
5861

62+
type callback func(kv *pb.KVList)
63+
5964
// DB provides the various functions required to interact with Badger.
6065
// DB is thread-safe.
6166
type DB struct {
@@ -85,6 +90,8 @@ type DB struct {
8590
blockWrites int32
8691

8792
orc *oracle
93+
94+
pub *publisher
8895
}
8996

9097
const (
@@ -267,6 +274,7 @@ func Open(opt Options) (db *DB, err error) {
267274
dirLockGuard: dirLockGuard,
268275
valueDirGuard: valueDirLockGuard,
269276
orc: newOracle(opt),
277+
pub: newPublisher(),
270278
}
271279

272280
// Calculate initial size.
@@ -323,6 +331,9 @@ func Open(opt Options) (db *DB, err error) {
323331
db.closers.valueGC = y.NewCloser(1)
324332
go db.vlog.waitOnGC(db.closers.valueGC)
325333

334+
db.closers.pub = y.NewCloser(1)
335+
go db.pub.listenForUpdates(db.closers.pub)
336+
326337
valueDirLockGuard = nil
327338
dirLockGuard = nil
328339
manifestFile = nil
@@ -342,6 +353,8 @@ func (db *DB) Close() (err error) {
342353
// Stop writes next.
343354
db.closers.writes.SignalAndWait()
344355

356+
db.closers.pub.SignalAndWait()
357+
345358
// Now close the value log.
346359
if vlogErr := db.vlog.Close(); vlogErr != nil {
347360
err = errors.Wrap(vlogErr, "DB.Close")
@@ -610,6 +623,8 @@ func (db *DB) writeRequests(reqs []*request) error {
610623
return err
611624
}
612625

626+
db.elog.Printf("Sending updates to subscribers")
627+
db.pub.sendUpdates(reqs)
613628
db.elog.Printf("Writing to memtable")
614629
var count int
615630
for _, b := range reqs {
@@ -662,6 +677,8 @@ func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
662677
req.Entries = entries
663678
req.Wg = sync.WaitGroup{}
664679
req.Wg.Add(1)
680+
req.IncrRef() // for db write
681+
req.IncrRef() // for publisher updates
665682
db.writeCh <- req // Handled in doWrites.
666683
y.NumPuts.Add(int64(len(entries)))
667684

@@ -1409,3 +1426,46 @@ func (db *DB) DropPrefix(prefix []byte) error {
14091426
db.opt.Infof("DropPrefix done")
14101427
return nil
14111428
}
1429+
1430+
// Subscribe can be used watch key changes for the given key prefix.
1431+
func (db *DB) Subscribe(ctx context.Context, cb callback, prefix []byte, prefixes ...[]byte) error {
1432+
if cb == nil {
1433+
return ErrNilCallback
1434+
}
1435+
prefixes = append(prefixes, prefix)
1436+
c := y.NewCloser(1)
1437+
recvCh, id := db.pub.newSubscriber(c, prefixes...)
1438+
slurp := func(batch *pb.KVList) {
1439+
defer func() {
1440+
if len(batch.GetKv()) > 0 {
1441+
cb(batch)
1442+
}
1443+
}()
1444+
for {
1445+
select {
1446+
case kvs := <-recvCh:
1447+
batch.Kv = append(batch.Kv, kvs.Kv...)
1448+
default:
1449+
return
1450+
}
1451+
}
1452+
}
1453+
for {
1454+
select {
1455+
case <-c.HasBeenClosed():
1456+
slurp(new(pb.KVList))
1457+
// Drain if any pending updates.
1458+
c.Done()
1459+
// No need to delete here. Closer will be called only while
1460+
// closing DB. Subscriber will be deleted by cleanSubscribers.
1461+
return nil
1462+
case <-ctx.Done():
1463+
c.Done()
1464+
db.pub.deleteSubscriber(id)
1465+
// Delete the subscriber to avoid further updates.
1466+
return ctx.Err()
1467+
case batch := <-recvCh:
1468+
slurp(batch)
1469+
}
1470+
}
1471+
}

db_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package badger
1818

1919
import (
2020
"bytes"
21+
"context"
2122
"encoding/binary"
2223
"flag"
2324
"fmt"
@@ -35,6 +36,7 @@ import (
3536
"time"
3637

3738
"github.com/dgraph-io/badger/options"
39+
"github.com/dgraph-io/badger/pb"
3840
"github.com/dgraph-io/badger/skl"
3941

4042
"github.com/dgraph-io/badger/y"
@@ -1514,10 +1516,31 @@ func TestGoroutineLeak(t *testing.T) {
15141516
t.Logf("Num go: %d", before)
15151517
for i := 0; i < 12; i++ {
15161518
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
1519+
updated := false
1520+
ctx, cancel := context.WithCancel(context.Background())
1521+
var wg sync.WaitGroup
1522+
wg.Add(1)
1523+
var subWg sync.WaitGroup
1524+
subWg.Add(1)
1525+
go func() {
1526+
subWg.Done()
1527+
err := db.Subscribe(ctx, func(kvs *pb.KVList) {
1528+
require.Equal(t, []byte("value"), kvs.Kv[0].GetValue())
1529+
updated = true
1530+
wg.Done()
1531+
}, []byte("key"))
1532+
if err != nil {
1533+
require.Equal(t, err.Error(), context.Canceled.Error())
1534+
}
1535+
}()
1536+
subWg.Wait()
15171537
err := db.Update(func(txn *Txn) error {
15181538
return txn.Set([]byte("key"), []byte("value"))
15191539
})
15201540
require.NoError(t, err)
1541+
wg.Wait()
1542+
cancel()
1543+
require.Equal(t, true, updated)
15211544
})
15221545
}
15231546
require.Equal(t, before, runtime.NumGoroutine())

errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,7 @@ var (
102102
// ErrBlockedWrites is returned if the user called DropAll. During the process of dropping all
103103
// data from Badger, we stop accepting new writes, by returning this error.
104104
ErrBlockedWrites = errors.New("Writes are blocked, possibly due to DropAll or Close")
105+
106+
// ErrNilCallback is returned when subscriber's callback is nil.
107+
ErrNilCallback = errors.New("Callback cannot be nil")
105108
)

0 commit comments

Comments
 (0)