Skip to content

Commit

Permalink
clientv3: combine Watch, WatchPrefix with variadic
Browse files Browse the repository at this point in the history
  • Loading branch information
gyuho committed Feb 24, 2016
1 parent 53f94c2 commit a24d276
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 56 deletions.
4 changes: 2 additions & 2 deletions clientv3/example_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func ExampleWatcher_watch() {
wc := clientv3.NewWatcher(cli)
defer wc.Close()

rch := wc.Watch(context.Background(), "foo", 0)
rch := wc.Watch(context.Background(), "foo")
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
Expand All @@ -57,7 +57,7 @@ func ExampleWatcher_watchPrefix() {
wc := clientv3.NewWatcher(cli)
defer wc.Close()

rch := wc.WatchPrefix(context.Background(), "foo", 0)
rch := wc.Watch(context.Background(), "foo", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
Expand Down
2 changes: 1 addition & 1 deletion clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func TestKVCompact(t *testing.T) {

wc := clientv3.NewWatcher(clus.RandClient())
defer wc.Close()
wchan := wc.Watch(ctx, "foo", 3)
wchan := wc.Watch(ctx, "foo", clientv3.WithRev(3))

if wr := <-wchan; wr.CompactRevision != 7 {
t.Fatalf("wchan CompactRevision got %v, want 7", wr.CompactRevision)
Expand Down
14 changes: 7 additions & 7 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
for _, k := range keys {
// key watcher
go func(key string) {
ch := wctx.w.Watch(context.TODO(), key, 0)
ch := wctx.w.Watch(context.TODO(), key)
if ch == nil {
t.Fatalf("expected watcher channel, got nil")
}
Expand All @@ -94,7 +94,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
}
// prefix watcher on "b" (bar and baz)
go func() {
prefixc := wctx.w.WatchPrefix(context.TODO(), "b", 0)
prefixc := wctx.w.Watch(context.TODO(), "b", clientv3.WithPrefix())
if prefixc == nil {
t.Fatalf("expected watcher channel, got nil")
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
}
}()
// should reconnect when requesting watch
if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil {
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}

Expand All @@ -200,7 +200,7 @@ func TestWatchReconnInit(t *testing.T) {
}

func testWatchReconnInit(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil {
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
// take down watcher connection
Expand All @@ -216,7 +216,7 @@ func TestWatchReconnRunning(t *testing.T) {
}

func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil {
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
putAndWatch(t, wctx, "a", "a")
Expand All @@ -233,7 +233,7 @@ func TestWatchCancelInit(t *testing.T) {

func testWatchCancelInit(t *testing.T, wctx *watchctx) {
ctx, cancel := context.WithCancel(context.Background())
if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil {
if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
t.Fatalf("expected non-nil watcher channel")
}
cancel()
Expand All @@ -254,7 +254,7 @@ func TestWatchCancelRunning(t *testing.T) {

func testWatchCancelRunning(t *testing.T, wctx *watchctx) {
ctx, cancel := context.WithCancel(context.Background())
if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil {
if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
t.Fatalf("expected non-nil watcher channel")
}
if _, err := wctx.kv.Put(ctx, "a", "a"); err != nil {
Expand Down
83 changes: 64 additions & 19 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package clientv3

import (
"reflect"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
)
Expand All @@ -36,10 +38,12 @@ type Op struct {

// for range
limit int64
rev int64
sort *SortOption
serializable bool

// for range, watch
rev int64

// for put
val []byte
leaseID lease.LeaseID
Expand All @@ -65,6 +69,27 @@ func (op Op) toRequestUnion() *pb.RequestUnion {
}
}

func (op Op) toWatchRequest() *watchRequest {
switch op.t {
case tRange:
key := string(op.key)
prefix := ""
if op.end != nil {
prefix = key
key = ""
}
wr := &watchRequest{
key: key,
prefix: prefix,
rev: op.rev,
}
return wr

default:
panic("Only for tRange")
}
}

func (op Op) isWrite() bool {
return op.t != tRange
}
Expand Down Expand Up @@ -111,6 +136,24 @@ func OpPut(key, val string, opts ...OpOption) Op {
return ret
}

func opWatch(key string, opts ...OpOption) Op {
ret := Op{t: tRange, key: []byte(key)}
ret.applyOpts(opts)
switch {
case ret.end != nil && !reflect.DeepEqual(ret.end, getPrefix(ret.key)):
panic("only supports single keys or prefixes")
case ret.leaseID != 0:
panic("unexpected lease in watch")
case ret.limit != 0:
panic("unexpected limit in watch")
case ret.sort != nil:
panic("unexpected sort in watch")
case ret.serializable != false:
panic("unexpected serializable in watch")
}
return ret
}

func (op *Op) applyOpts(opts []OpOption) {
for _, opt := range opts {
opt(op)
Expand All @@ -129,8 +172,7 @@ func WithLease(leaseID lease.LeaseID) OpOption {
func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } }

// WithRev specifies the store revision for 'Get' request.
//
// TODO: support Watch API
// Or the start revision of 'Watch' request.
func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } }

// WithSort specifies the ordering in 'Get' request. It requires
Expand All @@ -143,25 +185,28 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
}
}

// WithPrefix enables 'Get' or 'Delete' requests to operate on the
// keys with matching prefix. For example, 'Get(foo, WithPrefix())'
func getPrefix(key []byte) []byte {
end := make([]byte, len(key))
copy(end, key)
for i := len(end) - 1; i >= 0; i-- {
if end[i] < 0xff {
end[i] = end[i] + 1
end = end[:i+1]
return end
}
}
// next prefix does not exist (e.g., 0xffff);
// default to WithFromKey policy
end = []byte{0}
return end
}

// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
// can return 'foo1', 'foo2', and so on.
//
// TODO: support Watch API
func WithPrefix() OpOption {
return func(op *Op) {
op.end = make([]byte, len(op.key))
copy(op.end, op.key)
for i := len(op.end) - 1; i >= 0; i-- {
if op.end[i] < 0xff {
op.end[i] = op.end[i] + 1
op.end = op.end[:i+1]
return
}
}
// next prefix does not exist (e.g., 0xffff);
// default to WithFromKey policy
op.end = []byte{0}
op.end = getPrefix(op.key)
}
}

Expand Down
2 changes: 1 addition & 1 deletion clientv3/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {

// get all events since revision (or get non-compacted revision, if
// rev is too far behind)
wch := wapi.WatchPrefix(ctx, s.prefix, s.rev)
wch := wapi.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev))
for wr := range wch {
respchan <- wr
}
Expand Down
44 changes: 18 additions & 26 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,11 @@ import (
type WatchChan <-chan WatchResponse

type Watcher interface {
// Watch watches on a single key. The watched events will be returned
// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
Watch(ctx context.Context, key string, rev int64) WatchChan

// WatchPrefix watches on a prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan

// Close closes the watcher and cancels all watch requests.
Close() error
Expand Down Expand Up @@ -127,27 +121,16 @@ func NewWatcher(c *Client) Watcher {
return w
}

func (w *watcher) Watch(ctx context.Context, key string, rev int64) WatchChan {
return w.watch(ctx, key, "", rev)
}
// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
ow := opWatch(key, opts...)

func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan {
return w.watch(ctx, "", prefix, rev)
}
wr := ow.toWatchRequest()
wr.ctx = ctx

func (w *watcher) Close() error {
select {
case w.stopc <- struct{}{}:
case <-w.donec:
}
<-w.donec
return <-w.errc
}

// watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) WatchChan {
retc := make(chan chan WatchResponse, 1)
wr := &watchRequest{ctx: ctx, key: key, prefix: prefix, rev: rev, retc: retc}
wr.retc = retc

// submit request
select {
case w.reqc <- wr:
Expand All @@ -167,6 +150,15 @@ func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) Watc
}
}

func (w *watcher) Close() error {
select {
case w.stopc <- struct{}{}:
case <-w.donec:
}
<-w.donec
return <-w.errc
}

func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
if pendingReq == nil {
// no pending request; ignore
Expand Down

0 comments on commit a24d276

Please sign in to comment.