diff --git a/clientv3/example_watch_test.go b/clientv3/example_watch_test.go index 7c7ba1579f1..8c49821ac69 100644 --- a/clientv3/example_watch_test.go +++ b/clientv3/example_watch_test.go @@ -35,7 +35,7 @@ func ExampleWatcher_watch() { wc := clientv3.NewWatcher(cli) defer wc.Close() - rch := wc.Watch(context.Background(), "foo", 0) + rch := wc.Watch(context.Background(), "foo") for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) @@ -57,7 +57,7 @@ func ExampleWatcher_watchPrefix() { wc := clientv3.NewWatcher(cli) defer wc.Close() - rch := wc.WatchPrefix(context.Background(), "foo", 0) + rch := wc.Watch(context.Background(), "foo", clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index cf5b7f493d2..b0a450569af 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -94,7 +94,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) { } // prefix watcher on "b" (bar and baz) go func() { - prefixc := wctx.w.WatchPrefix(context.TODO(), "b", 0) + prefixc := wctx.w.Watch(context.TODO(), "b") if prefixc == nil { t.Fatalf("expected watcher channel, got nil") } diff --git a/clientv3/op.go b/clientv3/op.go index 7a5549720e4..8d5352032a1 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -26,6 +26,7 @@ const ( tRange opType = iota + 1 tPut tDeleteRange + tWatch ) // Op represents an Operation that kv can execute. @@ -36,10 +37,12 @@ type Op struct { // for range limit int64 - rev int64 sort *SortOption serializable bool + // for range, watch + rev int64 + // for put val []byte leaseID lease.LeaseID @@ -65,6 +68,26 @@ func (op Op) toRequestUnion() *pb.RequestUnion { } } +func (op Op) toWatchRequest() *watchRequest { + switch op.t { + case tWatch: + key := string(op.key) + prefix := string(op.end) + if prefix != "" { // either watch key or prefix + key = "" + } + wr := &watchRequest{ + key: key, + prefix: prefix, + rev: op.rev, + } + return wr + + default: + panic("Only for tWatch") + } +} + func (op Op) isWrite() bool { return op.t != tRange } @@ -111,6 +134,12 @@ func OpPut(key, val string, opts ...OpOption) Op { return ret } +func opWatch(key string, opts ...OpOption) Op { + ret := Op{t: tWatch, key: []byte(key)} + ret.applyOpts(opts) + return ret +} + func (op *Op) applyOpts(opts []OpOption) { for _, opt := range opts { opt(op) @@ -129,8 +158,7 @@ func WithLease(leaseID lease.LeaseID) OpOption { func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } } // WithRev specifies the store revision for 'Get' request. -// -// TODO: support Watch API +// Or the start revision of 'Watch' request. func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } } // WithSort specifies the ordering in 'Get' request. It requires @@ -143,11 +171,9 @@ func WithSort(target SortTarget, order SortOrder) OpOption { } } -// WithPrefix enables 'Get' or 'Delete' requests to operate on the -// keys with matching prefix. For example, 'Get(foo, WithPrefix())' +// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate +// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())' // can return 'foo1', 'foo2', and so on. -// -// TODO: support Watch API func WithPrefix() OpOption { return func(op *Op) { op.end = make([]byte, len(op.key)) diff --git a/clientv3/sync/syncer.go b/clientv3/sync/syncer.go index d416c660cc1..59bf95cbd27 100644 --- a/clientv3/sync/syncer.go +++ b/clientv3/sync/syncer.go @@ -116,7 +116,7 @@ func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan { // get all events since revision (or get non-compacted revision, if // rev is too far behind) - wch := wapi.WatchPrefix(ctx, s.prefix, s.rev) + wch := wapi.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev)) for wr := range wch { respchan <- wr } diff --git a/clientv3/watch.go b/clientv3/watch.go index cbc7df456d3..98931a6be40 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -27,17 +27,11 @@ import ( type WatchChan <-chan WatchResponse type Watcher interface { - // Watch watches on a single key. The watched events will be returned + // Watch watches on a key or prefix. The watched events will be returned // through the returned channel. // If the watch is slow or the required rev is compacted, the watch request // might be canceled from the server-side and the chan will be closed. - Watch(ctx context.Context, key string, rev int64) WatchChan - - // WatchPrefix watches on a prefix. The watched events will be returned - // through the returned channel. - // If the watch is slow or the required rev is compacted, the watch request - // might be canceled from the server-side and the chan will be closed. - WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan + Watch(ctx context.Context, key string, opts ...OpOption) WatchChan // Close closes the watcher and cancels all watch requests. Close() error @@ -127,27 +121,16 @@ func NewWatcher(c *Client) Watcher { return w } -func (w *watcher) Watch(ctx context.Context, key string, rev int64) WatchChan { - return w.watch(ctx, key, "", rev) -} +// Watch posts a watch request to run() and waits for a new watcher channel +func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { + ow := opWatch(key, opts...) -func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan { - return w.watch(ctx, "", prefix, rev) -} + wr := ow.toWatchRequest() + wr.ctx = ctx -func (w *watcher) Close() error { - select { - case w.stopc <- struct{}{}: - case <-w.donec: - } - <-w.donec - return <-w.errc -} - -// watch posts a watch request to run() and waits for a new watcher channel -func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) WatchChan { retc := make(chan chan WatchResponse, 1) - wr := &watchRequest{ctx: ctx, key: key, prefix: prefix, rev: rev, retc: retc} + wr.retc = retc + // submit request select { case w.reqc <- wr: @@ -167,6 +150,15 @@ func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) Watc } } +func (w *watcher) Close() error { + select { + case w.stopc <- struct{}{}: + case <-w.donec: + } + <-w.donec + return <-w.errc +} + func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { if pendingReq == nil { // no pending request; ignore