Skip to content

Commit

Permalink
Merge pull request #4120 from xiang90/ctrl_w
Browse files Browse the repository at this point in the history
*: support watcher cancellation inside watchStream
  • Loading branch information
xiang90 committed Jan 3, 2016
2 parents 94ac9ae + eda0eef commit c832d7f
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 6 deletions.
14 changes: 12 additions & 2 deletions etcdctlv3/command/watch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"os"
"strconv"
"strings"

"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
Expand Down Expand Up @@ -67,7 +68,7 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
// TODO: support start and end revision
segs := strings.Split(l, " ")
if len(segs) != 2 {
fmt.Fprintf(os.Stderr, "Invalid watch request format: use watch key or watchprefix prefix\n")
fmt.Fprintf(os.Stderr, "Invalid watch request format: use \"watch [key]\", \"watchprefix [prefix]\" or \"cancel [watcher ID]\"\n")
continue
}

Expand All @@ -77,8 +78,15 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
r = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte(segs[1])}}
case "watchprefix":
r = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte(segs[1])}}
case "cancel":
id, perr := strconv.ParseInt(segs[1], 10, 64)
if perr != nil {
fmt.Fprintf(os.Stderr, "Invalid cancel ID (%v)\n", perr)
continue
}
r = &pb.WatchRequest{CancelRequest: &pb.WatchCancelRequest{WatchId: id}}
default:
fmt.Fprintf(os.Stderr, "Invalid watch request format: use watch key or watchprefix prefix\n")
fmt.Fprintf(os.Stderr, "Invalid watch request type: use watch, watchprefix or cancel\n")
continue
}

Expand All @@ -103,6 +111,8 @@ func recvLoop(wStream pb.Watch_WatchClient) {
// TODO: handle canceled/compacted and other control response types
case resp.Created:
fmt.Printf("watcher created: id %08x\n", resp.WatchId)
case resp.Canceled:
fmt.Printf("watcher canceled: id %08x\n", resp.WatchId)
default:
for _, ev := range resp.Events {
fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
Expand Down
12 changes: 11 additions & 1 deletion etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,18 @@ func (sws *serverWatchStream) recvLoop() error {
WatchId: id,
Created: true,
}
case req.CancelRequest != nil:
id := req.CancelRequest.WatchId
err := sws.watchStream.Cancel(id)
if err == nil {
sws.ctrlStream <- &pb.WatchResponse{
// TODO: fill in response header.
WatchId: id,
Canceled: true,
}
}
// TODO: do we need to return error back to client?
default:
// TODO: support cancellation
panic("not implemented")
}
}
Expand Down
1 change: 1 addition & 0 deletions storage/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (s *watchableStore) NewWatchStream() WatchStream {
return &watchStream{
watchable: s,
ch: make(chan []storagepb.Event, chanBufLen),
cancels: make(map[int64]CancelFunc),
}
}

Expand Down
26 changes: 23 additions & 3 deletions storage/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package storage

import (
"errors"
"sync"

"github.com/coreos/etcd/storage/storagepb"
)

var (
ErrWatcherNotExist = errors.New("storage: watcher does not exist")
)

type WatchStream interface {
// Watch creates a watcher. The watcher watches the events happening or
// happened on the given key or key prefix from the given startRev.
Expand All @@ -30,11 +35,17 @@ type WatchStream interface {
//
// The returned `id` is the ID of this watcher. It appears as WatchID
// in events that are sent to the created watcher through stream channel.
//
// TODO: remove the returned CancelFunc. Always use Cancel.
Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc)

// Chan returns a chan. All watched events will be sent to the returned chan.
Chan() <-chan []storagepb.Event

// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
// returned.
Cancel(id int64) error

// Close closes the WatchChan and release all related resources.
Close()
}
Expand All @@ -49,7 +60,7 @@ type watchStream struct {
// nextID is the ID pre-allocated for next new watcher in this stream
nextID int64
closed bool
cancels []CancelFunc
cancels map[int64]CancelFunc
}

// TODO: return error if ws is closed?
Expand All @@ -65,15 +76,24 @@ func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) (id int64,

_, c := ws.watchable.watch(key, prefix, startRev, id, ws.ch)

// TODO: cancelFunc needs to be removed from the cancels when it is called.
ws.cancels = append(ws.cancels, c)
ws.cancels[id] = c
return id, c
}

func (ws *watchStream) Chan() <-chan []storagepb.Event {
return ws.ch
}

func (ws *watchStream) Cancel(id int64) error {
cancel, ok := ws.cancels[id]
if !ok {
return ErrWatcherNotExist
}
cancel()
delete(ws.cancels, id)
return nil
}

func (ws *watchStream) Close() {
ws.mu.Lock()
defer ws.mu.Unlock()
Expand Down
36 changes: 36 additions & 0 deletions storage/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,39 @@ func TestWatcherWatchID(t *testing.T) {
cancel()
}
}

// TestWatchStreamCancel ensures cancel calls the cancel func of the watcher
// with given id inside watchStream.
func TestWatchStreamCancelWatcherByID(t *testing.T) {
s := WatchableKV(newWatchableStore(tmpPath))
defer cleanup(s, tmpPath)

w := s.NewWatchStream()
defer w.Close()

id, _ := w.Watch([]byte("foo"), false, 0)

tests := []struct {
cancelID int64
werr error
}{
// no error should be returned when cancel the created watcher.
{id, nil},
// not exist error should be returned when cancel again.
{id, ErrWatcherNotExist},
// not exist error should be returned when cancel a bad id.
{id + 1, ErrWatcherNotExist},
}

for i, tt := range tests {
gerr := w.Cancel(tt.cancelID)

if gerr != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, gerr, tt.werr)
}
}

if l := len(w.(*watchStream).cancels); l != 0 {
t.Errorf("cancels = %d, want 0", l)
}
}

0 comments on commit c832d7f

Please sign in to comment.