Skip to content

Commit

Permalink
clientv3: combine Watch, WatchPrefix with variadic
Browse files Browse the repository at this point in the history
  • Loading branch information
gyuho committed Feb 24, 2016
1 parent 53f94c2 commit e36e263
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 47 deletions.
4 changes: 2 additions & 2 deletions clientv3/example_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func ExampleWatcher_watch() {
wc := clientv3.NewWatcher(cli)
defer wc.Close()

rch := wc.Watch(context.Background(), "foo", 0)
rch := wc.Watch(context.Background(), "foo")
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
Expand All @@ -57,7 +57,7 @@ func ExampleWatcher_watchPrefix() {
wc := clientv3.NewWatcher(cli)
defer wc.Close()

rch := wc.WatchPrefix(context.Background(), "foo", 0)
rch := wc.Watch(context.Background(), "foo", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
Expand Down
2 changes: 1 addition & 1 deletion clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
}
// prefix watcher on "b" (bar and baz)
go func() {
prefixc := wctx.w.WatchPrefix(context.TODO(), "b", 0)
prefixc := wctx.w.Watch(context.TODO(), "b")
if prefixc == nil {
t.Fatalf("expected watcher channel, got nil")
}
Expand Down
68 changes: 51 additions & 17 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
tRange opType = iota + 1
tPut
tDeleteRange
tWatch
)

// Op represents an Operation that kv can execute.
Expand All @@ -36,10 +37,15 @@ type Op struct {

// for range
limit int64
rev int64
sort *SortOption
serializable bool

// for range, watch
rev int64

// for watch
prefix string

// for put
val []byte
leaseID lease.LeaseID
Expand All @@ -65,6 +71,26 @@ func (op Op) toRequestUnion() *pb.RequestUnion {
}
}

func (op Op) toWatchRequest() *watchRequest {
switch op.t {
case tWatch:
key := string(op.key)
prefix := op.prefix
if prefix != "" { // either watch key or prefix
key = ""
}
wr := &watchRequest{
key: key,
prefix: prefix,
rev: op.rev,
}
return wr

default:
panic("Only for tWatch")
}
}

func (op Op) isWrite() bool {
return op.t != tRange
}
Expand Down Expand Up @@ -111,6 +137,12 @@ func OpPut(key, val string, opts ...OpOption) Op {
return ret
}

func opWatch(key string, opts ...OpOption) Op {
ret := Op{t: tWatch, key: []byte(key)}
ret.applyOpts(opts)
return ret
}

func (op *Op) applyOpts(opts []OpOption) {
for _, opt := range opts {
opt(op)
Expand All @@ -129,8 +161,7 @@ func WithLease(leaseID lease.LeaseID) OpOption {
func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } }

// WithRev specifies the store revision for 'Get' request.
//
// TODO: support Watch API
// Or the start revision of 'Watch' request.
func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } }

// WithSort specifies the ordering in 'Get' request. It requires
Expand All @@ -143,25 +174,28 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
}
}

// WithPrefix enables 'Get' or 'Delete' requests to operate on the
// keys with matching prefix. For example, 'Get(foo, WithPrefix())'
// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
// can return 'foo1', 'foo2', and so on.
//
// TODO: support Watch API
func WithPrefix() OpOption {
return func(op *Op) {
op.end = make([]byte, len(op.key))
copy(op.end, op.key)
for i := len(op.end) - 1; i >= 0; i-- {
if op.end[i] < 0xff {
op.end[i] = op.end[i] + 1
op.end = op.end[:i+1]
return
switch op.t {
case tWatch:
op.prefix = string(op.key)
default:
op.end = make([]byte, len(op.key))
copy(op.end, op.key)
for i := len(op.end) - 1; i >= 0; i-- {
if op.end[i] < 0xff {
op.end[i] = op.end[i] + 1
op.end = op.end[:i+1]
return
}
}
// next prefix does not exist (e.g., 0xffff);
// default to WithFromKey policy
op.end = []byte{0}
}
// next prefix does not exist (e.g., 0xffff);
// default to WithFromKey policy
op.end = []byte{0}
}
}

Expand Down
2 changes: 1 addition & 1 deletion clientv3/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {

// get all events since revision (or get non-compacted revision, if
// rev is too far behind)
wch := wapi.WatchPrefix(ctx, s.prefix, s.rev)
wch := wapi.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev))
for wr := range wch {
respchan <- wr
}
Expand Down
44 changes: 18 additions & 26 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,11 @@ import (
type WatchChan <-chan WatchResponse

type Watcher interface {
// Watch watches on a single key. The watched events will be returned
// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
Watch(ctx context.Context, key string, rev int64) WatchChan

// WatchPrefix watches on a prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan

// Close closes the watcher and cancels all watch requests.
Close() error
Expand Down Expand Up @@ -127,27 +121,16 @@ func NewWatcher(c *Client) Watcher {
return w
}

func (w *watcher) Watch(ctx context.Context, key string, rev int64) WatchChan {
return w.watch(ctx, key, "", rev)
}
// watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
ow := opWatch(key, opts...)

func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan {
return w.watch(ctx, "", prefix, rev)
}
wr := ow.toWatchRequest()
wr.ctx = ctx

func (w *watcher) Close() error {
select {
case w.stopc <- struct{}{}:
case <-w.donec:
}
<-w.donec
return <-w.errc
}

// watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) WatchChan {
retc := make(chan chan WatchResponse, 1)
wr := &watchRequest{ctx: ctx, key: key, prefix: prefix, rev: rev, retc: retc}
wr.retc = retc

// submit request
select {
case w.reqc <- wr:
Expand All @@ -167,6 +150,15 @@ func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) Watc
}
}

func (w *watcher) Close() error {
select {
case w.stopc <- struct{}{}:
case <-w.donec:
}
<-w.donec
return <-w.errc
}

func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
if pendingReq == nil {
// no pending request; ignore
Expand Down

0 comments on commit e36e263

Please sign in to comment.