Skip to content

Commit

Permalink
storage: check prefix in unsynced
Browse files Browse the repository at this point in the history
Current syncWatchers method skips the events that have
prefixes that are being watched when the prefix is not
existent as a key. This fixes etcd-io#4191
by adding prefix checking to not skip those events.
  • Loading branch information
gyuho committed Jan 13, 2016
1 parent ca9ad68 commit ee6892d
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 1 deletion.
19 changes: 18 additions & 1 deletion storage/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"log"
"math"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -261,6 +262,17 @@ func (s *watchableStore) syncWatchersLoop() {
}
}

// matchPrefix returns true if key has any matching prefix
// from prefixes map.
func matchPrefix(key string, prefixes map[string]struct{}) bool {
for p := range prefixes {
if strings.HasPrefix(key, p) {
return true
}
}
return false
}

// syncWatchers periodically syncs unsynced watchers by: Iterate all unsynced
// watchers to get the minimum revision within its range, skipping the
// watcher if its current revision is behind the compact revision of the
Expand All @@ -284,6 +296,7 @@ func (s *watchableStore) syncWatchers() {

// TODO: change unsynced struct type same to this
keyToUnsynced := make(map[string]map[*watcher]struct{})
prefixToWatch := make(map[string]struct{})

for w := range s.unsynced {
k := string(w.key)
Expand All @@ -307,6 +320,10 @@ func (s *watchableStore) syncWatchers() {
keyToUnsynced[k] = make(map[*watcher]struct{})
}
keyToUnsynced[k][w] = struct{}{}

if w.prefix {
prefixToWatch[k] = struct{}{}
}
}

minBytes, maxBytes := newRevBytes(), newRevBytes()
Expand All @@ -330,7 +347,7 @@ func (s *watchableStore) syncWatchers() {
}

k := string(kv.Key)
if _, ok := keyToUnsynced[k]; !ok {
if _, ok := keyToUnsynced[k]; !ok && !matchPrefix(k, prefixToWatch) {
continue
}

Expand Down
70 changes: 70 additions & 0 deletions storage/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"bytes"
"testing"

"github.com/coreos/etcd/lease"
Expand Down Expand Up @@ -73,6 +74,75 @@ func TestWatcherWatchID(t *testing.T) {
}
}

// TestWatcherWatchPrefix tests if Watch operation correctly watches
// and returns events with matching prefixes.
func TestWatcherWatchPrefix(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
defer cleanup(s, b, tmpPath)

w := s.NewWatchStream()
defer w.Close()

idm := make(map[WatchID]struct{})

prefixMatch := true
val := []byte("bar")
key, keyL := []byte("foo"), []byte("foobar")

for i := 0; i < 10; i++ {
id := w.Watch(key, prefixMatch, 0)
if _, ok := idm[id]; ok {
t.Errorf("#%d: id %d exists", i, id)
}
idm[id] = struct{}{}

s.Put(keyL, val, lease.NoLease)

resp := <-w.Chan()
if resp.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
}

if err := w.Cancel(id); err != nil {
t.Error(err)
}

if len(resp.Events) > 1 {
if !bytes.Equal(resp.Events[0].Kv.Key, keyL) {
t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyL)
}
}
}

key1, key1L := []byte("foo1"), []byte("foo1bar")
s.Put(key1L, val, lease.NoLease)

// unsynced watchers
for i := 10; i < 20; i++ {
id := w.Watch(key1, prefixMatch, 1)
if _, ok := idm[id]; ok {
t.Errorf("#%d: id %d exists", i, id)
}
idm[id] = struct{}{}

resp := <-w.Chan()
if resp.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
}

if err := w.Cancel(id); err != nil {
t.Error(err)
}

if len(resp.Events) > 1 {
if !bytes.Equal(resp.Events[0].Kv.Key, key1L) {
t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, key1L)
}
}
}
}

// TestWatchStreamCancel ensures cancel calls the cancel func of the watcher
// with given id inside watchStream.
func TestWatchStreamCancelWatcherByID(t *testing.T) {
Expand Down

0 comments on commit ee6892d

Please sign in to comment.