Skip to content

Commit

Permalink
Update the Store and Cursor interfaces to accept Context and return e…
Browse files Browse the repository at this point in the history
…rrors (#1084)

* Add error value to the Store and Cursor interface methods.
* Add context to Store and Cursor interface methods
* Unify logging in tests
  • Loading branch information
dlsniper committed Nov 11, 2022
1 parent 9e58234 commit 06a8ecc
Show file tree
Hide file tree
Showing 20 changed files with 485 additions and 324 deletions.
10 changes: 7 additions & 3 deletions chain/beacon/callbacks_test.go
@@ -1,18 +1,22 @@
package beacon

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/drand/drand/chain"
"github.com/drand/drand/chain/boltdb"
"github.com/drand/drand/test"
)

func TestStoreCallback(t *testing.T) {
dir := t.TempDir()
bbstore, err := boltdb.NewBoltStore(dir, nil)
ctx := context.Background()
l := test.Logger(t)
bbstore, err := boltdb.NewBoltStore(l, dir, nil)
require.NoError(t, err)
cb := NewCallbackStore(bbstore)
id1 := "superid"
Expand All @@ -21,12 +25,12 @@ func TestStoreCallback(t *testing.T) {
doneCh <- true
})

cb.Put(&chain.Beacon{
cb.Put(ctx, &chain.Beacon{
Round: 1,
})
require.True(t, checkOne(doneCh))
cb.AddCallback(id1, func(*chain.Beacon) {})
cb.Put(&chain.Beacon{
cb.Put(ctx, &chain.Beacon{
Round: 1,
})
require.False(t, checkOne(doneCh))
Expand Down
6 changes: 3 additions & 3 deletions chain/beacon/chain.go
Expand Up @@ -98,7 +98,7 @@ func (c *chainStore) NewValidPartial(addr string, p *drand.PartialBeaconPacket)

func (c *chainStore) Stop() {
c.syncm.Stop()
c.CallbackStore.Close()
c.CallbackStore.Close(context.Background())
close(c.done)
}

Expand All @@ -110,7 +110,7 @@ var partialCacheStoreLimit = 3
// runAggregator runs a continuous loop that tries to aggregate partial
// signatures when it can
func (c *chainStore) runAggregator() {
lastBeacon, err := c.Last()
lastBeacon, err := c.Last(context.Background())
if err != nil {
c.l.Fatalw("", "chain_aggregator", "loading", "last_beacon", err)
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *chainStore) tryAppend(last, newB *chain.Beacon) bool {
return false
}

if err := c.CallbackStore.Put(newB); err != nil {
if err := c.CallbackStore.Put(context.Background(), newB); err != nil {
// if round is ok but bytes are different, error will be raised
c.l.Errorw("", "chain_store", "error storing beacon", "err", err)
return false
Expand Down
4 changes: 2 additions & 2 deletions chain/beacon/node.go
Expand Up @@ -73,7 +73,7 @@ func NewHandler(c net.ProtocolClient, s chain.Store, conf *Config, l log.Logger,
addr := conf.Public.Address()
crypto := newCryptoStore(conf.Group, conf.Share)
// insert genesis beacon
if err := s.Put(chain.GenesisBeacon(crypto.chain)); err != nil {
if err := s.Put(context.Background(), chain.GenesisBeacon(crypto.chain)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -315,7 +315,7 @@ func (h *Handler) run(startTime int64) {
h.Unlock()
})

lastBeacon, err := h.chain.Last()
lastBeacon, err := h.chain.Last(context.Background())
if err != nil {
h.l.Errorw("", "beacon_loop", "loading_last", "err", err)
break
Expand Down
19 changes: 10 additions & 9 deletions chain/beacon/node_test.go
Expand Up @@ -191,7 +191,9 @@ func (b *BeaconTest) CreateNode(t *testing.T, i int) {
node.private = priv
keyShare := findShare(idx)
node.shares = keyShare
store, err := boltdb.NewBoltStore(b.paths[idx], nil)

l := test.Logger(t)
store, err := boltdb.NewBoltStore(l, b.paths[idx], nil)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -367,7 +369,7 @@ func createBoltStores(prefix string, n int) []string {
return paths
}

func checkWait(counter *sync.WaitGroup) {
func checkWait(t *testing.T, counter *sync.WaitGroup) {
var doneCh = make(chan bool, 1)
go func() {
counter.Wait()
Expand All @@ -376,9 +378,8 @@ func checkWait(counter *sync.WaitGroup) {
select {
case <-doneCh:
break

case <-time.After(30 * time.Second):
panic("outdated beacon time")
t.Fatal("outdated beacon time")
}
}

Expand Down Expand Up @@ -409,7 +410,7 @@ func TestBeaconSync(t *testing.T) {
doRound := func(count int, move time.Duration) {
counter.Add(count)
bt.MoveTime(t, move)
checkWait(counter)
checkWait(t, counter)
}

t.Log("serving beacons")
Expand Down Expand Up @@ -513,11 +514,11 @@ func TestBeaconSimple(t *testing.T) {
bt.MoveTime(t, 1*time.Second)

// check 1 period
checkWait(counter)
checkWait(t, counter)
// check 2 period
counter.Add(n)
bt.MoveTime(t, period)
checkWait(counter)
checkWait(t, counter)
}

func TestBeaconThreshold(t *testing.T) {
Expand Down Expand Up @@ -557,7 +558,7 @@ func TestBeaconThreshold(t *testing.T) {
currentRound++
counter.Add(howMany)
bt.MoveTime(t, period)
checkWait(&counter)
checkWait(t, &counter)
time.Sleep(100 * time.Millisecond)
}
}()
Expand All @@ -576,7 +577,7 @@ func TestBeaconThreshold(t *testing.T) {
currentRound = 1
counter.Add(n - 1)
bt.MoveTime(t, offsetGenesis)
checkWait(&counter)
checkWait(t, &counter)

// make a few rounds
makeRounds(nRounds, n-1)
Expand Down
29 changes: 15 additions & 14 deletions chain/beacon/store.go
Expand Up @@ -2,6 +2,7 @@ package beacon

import (
"bytes"
"context"
"fmt"
"runtime"
"sync"
Expand Down Expand Up @@ -34,20 +35,20 @@ type appendStore struct {
}

func newAppendStore(s chain.Store) chain.Store {
last, _ := s.Last()
last, _ := s.Last(context.Background())
return &appendStore{
Store: s,
last: last,
}
}

func (a *appendStore) Put(b *chain.Beacon) error {
func (a *appendStore) Put(ctx context.Context, b *chain.Beacon) error {
a.Lock()
defer a.Unlock()
if b.Round != a.last.Round+1 {
return fmt.Errorf("invalid round inserted: last %d, new %d", a.last.Round, b.Round)
}
if err := a.Store.Put(b); err != nil {
if err := a.Store.Put(ctx, b); err != nil {
return err
}
a.last = b
Expand All @@ -63,15 +64,15 @@ type schemeStore struct {
}

func NewSchemeStore(s chain.Store, sch scheme.Scheme) chain.Store {
last, _ := s.Last()
last, _ := s.Last(context.Background())
return &schemeStore{
Store: s,
last: last,
sch: sch,
}
}

func (a *schemeStore) Put(b *chain.Beacon) error {
func (a *schemeStore) Put(ctx context.Context, b *chain.Beacon) error {
a.Lock()
defer a.Unlock()

Expand All @@ -81,13 +82,13 @@ func (a *schemeStore) Put(b *chain.Beacon) error {
if a.sch.DecouplePrevSig {
b.PreviousSig = nil
} else if !bytes.Equal(a.last.Signature, b.PreviousSig) {
if pb, err := a.Get(b.Round - 1); err != nil || !bytes.Equal(pb.Signature, b.PreviousSig) {
if pb, err := a.Get(ctx, b.Round-1); err != nil || !bytes.Equal(pb.Signature, b.PreviousSig) {
return fmt.Errorf("invalid previous signature for %d or "+
"previous beacon not found in database. Err: %w", b.Round, err)
}
}

if err := a.Store.Put(b); err != nil {
if err := a.Store.Put(ctx, b); err != nil {
return err
}

Expand All @@ -112,8 +113,8 @@ func newDiscrepancyStore(s chain.Store, l log.Logger, group *key.Group, cl clock
}
}

func (d *discrepancyStore) Put(b *chain.Beacon) error {
if err := d.Store.Put(b); err != nil {
func (d *discrepancyStore) Put(ctx context.Context, b *chain.Beacon) error {
if err := d.Store.Put(ctx, b); err != nil {
return err
}

Expand Down Expand Up @@ -160,8 +161,8 @@ func NewCallbackStore(s chain.Store) CallbackStore {
}

// Put stores a new beacon
func (c *callbackStore) Put(b *chain.Beacon) error {
if err := c.Store.Put(b); err != nil {
func (c *callbackStore) Put(ctx context.Context, b *chain.Beacon) error {
if err := c.Store.Put(ctx, b); err != nil {
return err
}
if b.Round != 0 {
Expand Down Expand Up @@ -190,9 +191,9 @@ func (c *callbackStore) RemoveCallback(id string) {
delete(c.callbacks, id)
}

func (c *callbackStore) Close() {
c.Store.Close()
close(c.done)
func (c *callbackStore) Close(ctx context.Context) error {
defer close(c.done)
return c.Store.Close(ctx)
}

func (c *callbackStore) runWorkers(n int) {
Expand Down
14 changes: 9 additions & 5 deletions chain/beacon/store_test.go
Expand Up @@ -2,25 +2,29 @@ package beacon

import (
"bytes"
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/drand/drand/chain"
"github.com/drand/drand/chain/boltdb"
"github.com/drand/drand/common/scheme"
"github.com/drand/drand/test"
)

func TestSchemeStore(t *testing.T) {
sch, _ := scheme.ReadSchemeByEnv()

dir := t.TempDir()
ctx := context.Background()

bstore, err := boltdb.NewBoltStore(dir, nil)
l := test.Logger(t)
bstore, err := boltdb.NewBoltStore(l, dir, nil)
require.NoError(t, err)

genesisBeacon := chain.GenesisBeacon(&chain.Info{GenesisSeed: []byte("genesis_signature")})
err = bstore.Put(genesisBeacon)
err = bstore.Put(ctx, genesisBeacon)
require.NoError(t, err)

ss := NewSchemeStore(bstore, sch)
Expand All @@ -30,10 +34,10 @@ func TestSchemeStore(t *testing.T) {
Signature: []byte("signature_1"),
PreviousSig: []byte("genesis_signature"),
}
err = ss.Put(newBeacon)
err = ss.Put(ctx, newBeacon)
require.NoError(t, err)

beaconSaved, err := ss.Last()
beaconSaved, err := ss.Last(ctx)
require.NoError(t, err)

// test if store sets to nil prev signature depending on scheme
Expand All @@ -51,7 +55,7 @@ func TestSchemeStore(t *testing.T) {
PreviousSig: nil,
}

err = ss.Put(newBeacon)
err = ss.Put(ctx, newBeacon)

// test if store checks consistency between signature and prev signature depending on the scheme
if sch.DecouplePrevSig && err != nil {
Expand Down

0 comments on commit 06a8ecc

Please sign in to comment.