Skip to content

Commit

Permalink
pasive watch mode (#759)
Browse files Browse the repository at this point in the history
caching of passive client `Watch` stream when not in `autowatch` mode.
  • Loading branch information
willscott committed Oct 16, 2020
1 parent 112037f commit 7717801
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 11 deletions.
57 changes: 53 additions & 4 deletions client/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ const (

// newWatchAggregator maintains state of consumers calling `Watch` so that a
// single `watch` request is made to the underlying client.
func newWatchAggregator(c Client, autoWatch bool, autoWatchRetry time.Duration) *watchAggregator {
// There are 3 modes taken by this aggregator. If autowatch is set, a single `watch`
// will always be invoked on the provided client. If it is not set, but a `watch client`(wc)
// is passed, a `watch` will be run on the watch client in the absence of external watchers,
// which will swap watching over to the main client. If no watch client is set and autowatch is off
// then a single watch will only run when an external watch is requested.
func newWatchAggregator(c, wc Client, autoWatch bool, autoWatchRetry time.Duration) *watchAggregator {
if autoWatchRetry == 0 {
autoWatchRetry = defaultAutoWatchRetry
}
aggregator := &watchAggregator{
Client: c,
passiveClient: wc,
autoWatch: autoWatch,
autoWatchRetry: autoWatchRetry,
log: log.DefaultLogger(),
Expand All @@ -39,20 +45,24 @@ type subscriber struct {

type watchAggregator struct {
Client
passiveClient Client
autoWatch bool
autoWatchRetry time.Duration
log log.Logger
cancelAutoWatch context.CancelFunc

subscriberLock sync.Mutex
subscribers []subscriber
cancelPassive context.CancelFunc
}

// Start initiates auto watching if configured to do so.
// SetLog should not be called after Start.
func (c *watchAggregator) Start() {
if c.autoWatch {
c.startAutoWatch()
c.startAutoWatch(true)
} else if c.passiveClient != nil {
c.startAutoWatch(false)
}
}

Expand All @@ -66,12 +76,17 @@ func (c *watchAggregator) String() string {
return fmt.Sprintf("%s.(+aggregator)", c.Client)
}

func (c *watchAggregator) startAutoWatch() {
func (c *watchAggregator) startAutoWatch(full bool) {
ctx, cancel := context.WithCancel(context.Background())
c.cancelAutoWatch = cancel
go func() {
for {
results := c.Watch(ctx)
var results <-chan Result
if full {
results = c.Watch(ctx)
} else if c.passiveClient != nil {
results = c.passiveWatch(ctx)
}
LOOP:
for {
select {
Expand All @@ -98,6 +113,29 @@ func (c *watchAggregator) startAutoWatch() {
}()
}

// passiveWatch is a degraded form of watch, where watch only hits the 'passive client'
// unless distribution is actually needed.
func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan Result {
c.subscriberLock.Lock()
defer c.subscriberLock.Unlock()

if c.cancelPassive != nil {
c.log.Warn("watch_aggregator", "only support one passive watch")
return nil
}

wc := make(chan Result)
if len(c.subscribers) == 0 {
ctx, cancel := context.WithCancel(ctx)
c.cancelPassive = cancel
go c.sink(c.passiveClient.Watch(ctx), wc)
} else {
// trigger the startAutowatch to retry on backoff
close(wc)
}
return wc
}

func (c *watchAggregator) Watch(ctx context.Context) <-chan Result {
c.subscriberLock.Lock()
defer c.subscriberLock.Unlock()
Expand All @@ -106,12 +144,23 @@ func (c *watchAggregator) Watch(ctx context.Context) <-chan Result {
c.subscribers = append(c.subscribers, sub)

if len(c.subscribers) == 1 {
if c.cancelPassive != nil {
c.cancelPassive()
c.cancelPassive = nil
}
ctx, cancel := context.WithCancel(context.Background())
go c.distribute(c.Client.Watch(ctx), cancel)
}
return sub.c
}

func (c *watchAggregator) sink(in <-chan Result, out chan Result) {
defer close(out)
for range in {
continue
}
}

func (c *watchAggregator) distribute(in <-chan Result, cancel context.CancelFunc) {
defer cancel()
for {
Expand Down
55 changes: 54 additions & 1 deletion client/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package client
import (
"sync"
"testing"
"time"

"github.com/drand/drand/client/test/result/mock"
)

func TestAggregatorClose(t *testing.T) {
Expand All @@ -17,7 +20,7 @@ func TestAggregatorClose(t *testing.T) {
},
}

ac := newWatchAggregator(c, true, 0)
ac := newWatchAggregator(c, nil, true, 0)

err := ac.Close() // should cancel the autoWatch and close the underlying client
if err != nil {
Expand All @@ -26,3 +29,53 @@ func TestAggregatorClose(t *testing.T) {

wg.Wait() // wait for underlying client to close
}

func TestAggregatorPassive(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)

c := &MockClient{
WatchCh: make(chan Result, 1),
CloseF: func() error {
wg.Done()
return nil
},
}

wc := &MockClient{
WatchCh: make(chan Result, 1),
CloseF: func() error {
return nil
},
}

ac := newWatchAggregator(c, wc, false, 0)

wc.WatchCh <- &mock.Result{Rnd: 1234}
c.WatchCh <- &mock.Result{Rnd: 5678}

ac.Start()

time.Sleep(50 * time.Millisecond)

zzz := time.NewTimer(time.Millisecond * 50)
select {
case w := <-wc.WatchCh:
t.Fatalf("passive watch should be drained, but got %v", w)
case <-zzz.C:
}

zzz = time.NewTimer(time.Millisecond * 50)
select {
case <-c.WatchCh:
case <-zzz.C:
t.Fatalf("active watch should not have been called but was")
}

err := ac.Close()
if err != nil {
t.Fatal(err)
}

wg.Wait()
}
2 changes: 1 addition & 1 deletion client/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestCacheWatch(t *testing.T) {
t.Fatal(err)
}
cache, _ := NewCachingClient(m, arcCache)
c := newWatchAggregator(cache, false, 0)
c := newWatchAggregator(cache, nil, false, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r1 := c.Watch(ctx)
Expand Down
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func makeClient(cfg *clientConfig) (Client, error) {
return nil, err
}

wa := newWatchAggregator(c, cfg.autoWatch, cfg.autoWatchRetry)
wa := newWatchAggregator(c, wc, cfg.autoWatch, cfg.autoWatchRetry)
c = wa
trySetLog(c, cfg.log)

Expand Down
2 changes: 1 addition & 1 deletion core/drand_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/drand/kyber/encrypt/ecies"
)

// FreshDKG is the public method to call during a DKG protocol.
// BroadcastDKG is the public method to call during a DKG protocol.
func (d *Drand) BroadcastDKG(c context.Context, in *drand.DKGPacket) (*drand.Empty, error) {
d.state.Lock()
defer d.state.Unlock()
Expand Down
12 changes: 9 additions & 3 deletions lp2p/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type Client struct {
cancel func()
latest uint64
cache client.Cache
log log.Logger

subs struct {
Expand Down Expand Up @@ -54,6 +55,7 @@ func NewWithPubsub(ps *pubsub.PubSub, info *chain.Info, cache client.Cache) (*Cl
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
cancel: cancel,
cache: cache,
log: log.DefaultLogger(),
}

Expand Down Expand Up @@ -161,13 +163,17 @@ func (c *Client) Watch(ctx context.Context) <-chan client.Result {
close(outerCh)
return
}
select {
case outerCh <- &client.RandomData{
dat := &client.RandomData{
Rnd: resp.Round,
Random: resp.Randomness,
Sig: resp.Signature,
PreviousSignature: resp.PreviousSignature,
}:
}
if c.cache != nil {
c.cache.Add(resp.Round, dat)
}
select {
case outerCh <- dat:
default:
c.log.Warn("gossip client", "randomness notification dropped due to a full channel")
}
Expand Down

0 comments on commit 7717801

Please sign in to comment.