From ee6892dd03e0b4a405e6069ff0989fddaad75b38 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 13 Jan 2016 09:50:03 -0800 Subject: [PATCH] storage: check prefix in unsynced Current syncWatchers method skips the events that have prefixes that are being watched when the prefix is not existent as a key. This fixes https://github.com/coreos/etcd/issues/4191 by adding prefix checking to not skip those events. --- storage/watchable_store.go | 19 ++++++++++- storage/watcher_test.go | 70 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index ae5f0f71ed87..0d795db47d97 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -18,6 +18,7 @@ import ( "fmt" "log" "math" + "strings" "sync" "time" @@ -261,6 +262,17 @@ func (s *watchableStore) syncWatchersLoop() { } } +// matchPrefix returns true if key has any matching prefix +// from prefixes map. +func matchPrefix(key string, prefixes map[string]struct{}) bool { + for p := range prefixes { + if strings.HasPrefix(key, p) { + return true + } + } + return false +} + // syncWatchers periodically syncs unsynced watchers by: Iterate all unsynced // watchers to get the minimum revision within its range, skipping the // watcher if its current revision is behind the compact revision of the @@ -284,6 +296,7 @@ func (s *watchableStore) syncWatchers() { // TODO: change unsynced struct type same to this keyToUnsynced := make(map[string]map[*watcher]struct{}) + prefixToWatch := make(map[string]struct{}) for w := range s.unsynced { k := string(w.key) @@ -307,6 +320,10 @@ func (s *watchableStore) syncWatchers() { keyToUnsynced[k] = make(map[*watcher]struct{}) } keyToUnsynced[k][w] = struct{}{} + + if w.prefix { + prefixToWatch[k] = struct{}{} + } } minBytes, maxBytes := newRevBytes(), newRevBytes() @@ -330,7 +347,7 @@ func (s *watchableStore) syncWatchers() { } k := string(kv.Key) - if _, ok := keyToUnsynced[k]; !ok { + if _, ok := keyToUnsynced[k]; !ok && !matchPrefix(k, prefixToWatch) { continue } diff --git a/storage/watcher_test.go b/storage/watcher_test.go index e520d9983003..e7fd367fd86b 100644 --- a/storage/watcher_test.go +++ b/storage/watcher_test.go @@ -15,6 +15,7 @@ package storage import ( + "bytes" "testing" "github.com/coreos/etcd/lease" @@ -73,6 +74,75 @@ func TestWatcherWatchID(t *testing.T) { } } +// TestWatcherWatchPrefix tests if Watch operation correctly watches +// and returns events with matching prefixes. +func TestWatcherWatchPrefix(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{})) + defer cleanup(s, b, tmpPath) + + w := s.NewWatchStream() + defer w.Close() + + idm := make(map[WatchID]struct{}) + + prefixMatch := true + val := []byte("bar") + key, keyL := []byte("foo"), []byte("foobar") + + for i := 0; i < 10; i++ { + id := w.Watch(key, prefixMatch, 0) + if _, ok := idm[id]; ok { + t.Errorf("#%d: id %d exists", i, id) + } + idm[id] = struct{}{} + + s.Put(keyL, val, lease.NoLease) + + resp := <-w.Chan() + if resp.WatchID != id { + t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) + } + + if err := w.Cancel(id); err != nil { + t.Error(err) + } + + if len(resp.Events) > 1 { + if !bytes.Equal(resp.Events[0].Kv.Key, keyL) { + t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyL) + } + } + } + + key1, key1L := []byte("foo1"), []byte("foo1bar") + s.Put(key1L, val, lease.NoLease) + + // unsynced watchers + for i := 10; i < 20; i++ { + id := w.Watch(key1, prefixMatch, 1) + if _, ok := idm[id]; ok { + t.Errorf("#%d: id %d exists", i, id) + } + idm[id] = struct{}{} + + resp := <-w.Chan() + if resp.WatchID != id { + t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) + } + + if err := w.Cancel(id); err != nil { + t.Error(err) + } + + if len(resp.Events) > 1 { + if !bytes.Equal(resp.Events[0].Kv.Key, key1L) { + t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, key1L) + } + } + } +} + // TestWatchStreamCancel ensures cancel calls the cancel func of the watcher // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) {