Skip to content

Commit

Permalink
Merge pull request #24 from mthenw/fix-watch-list
Browse files Browse the repository at this point in the history
use rev returned by List in Watch. Closes #22
  • Loading branch information
abronan committed Aug 30, 2017
2 parents 4f0a102 + 7a8d4c5 commit 0c1d214
Showing 1 changed file with 31 additions and 32 deletions.
63 changes: 31 additions & 32 deletions store/etcd/v3/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (s *EtcdV3) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*
respCh := make(chan []*store.KVPair)

// Get the current value
pairs, err := s.List(directory)
rev, pairs, err := s.list(directory)
if err != nil {
return nil, err
}
Expand All @@ -253,15 +253,8 @@ func (s *EtcdV3) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*
// Push the current value through the channel.
respCh <- pairs

var minRev uint64
for _, pair := range pairs {
if minRev < pair.LastIndex {
minRev = pair.LastIndex
}
}
minRev++

watchCh := wc.Watch(context.Background(), s.normalize(directory), etcd.WithPrefix(), etcd.WithRev(int64(minRev)))
rev++
watchCh := wc.Watch(context.Background(), s.normalize(directory), etcd.WithPrefix(), etcd.WithRev(rev))

for resp := range watchCh {
// Check if the watch was stopped by the caller
Expand Down Expand Up @@ -379,28 +372,8 @@ func (s *EtcdV3) AtomicDelete(key string, previous *store.KVPair) (bool, error)

// List child nodes of a given directory
func (s *EtcdV3) List(directory string) ([]*store.KVPair, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdDefaultTimeout)
resp, err := s.client.KV.Get(ctx, s.normalize(directory), etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortDescend))
cancel()
if err != nil {
return nil, err
}

if resp.Count == 0 {
return nil, store.ErrKeyNotFound
}

kv := []*store.KVPair{}

for _, n := range resp.Kvs {
kv = append(kv, &store.KVPair{
Key: string(n.Key),
Value: []byte(n.Value),
LastIndex: uint64(n.ModRevision),
})
}

return kv, nil
_, kv, err := s.list(directory)
return kv, err
}

// DeleteTree deletes a range of keys under a given directory
Expand Down Expand Up @@ -508,3 +481,29 @@ func (l *etcdLock) Unlock() error {
func (s *EtcdV3) Close() {
s.client.Close()
}

// list child nodes of a given directory and return revision number
func (s *EtcdV3) list(directory string) (int64, []*store.KVPair, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdDefaultTimeout)
resp, err := s.client.KV.Get(ctx, s.normalize(directory), etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortDescend))
cancel()
if err != nil {
return 0, nil, err
}

if resp.Count == 0 {
return 0, nil, store.ErrKeyNotFound
}

kv := []*store.KVPair{}

for _, n := range resp.Kvs {
kv = append(kv, &store.KVPair{
Key: string(n.Key),
Value: []byte(n.Value),
LastIndex: uint64(n.ModRevision),
})
}

return resp.Header.Revision, kv, nil
}

0 comments on commit 0c1d214

Please sign in to comment.