Skip to content

Commit

Permalink
etcd: Treat etcd watcher errors by restarting the watcher
Browse files Browse the repository at this point in the history
etcd does not support watching on a compacted revision and will error out. Do a
fresh get on the latest revision and restart the watcher. In order to continue
maintaining the proper  order of events, a local cache is introduced.

The ListDone signal is only emitted once at the beginning.

On ReList, deletion events are sent for keys which can no longer be found.

Fixes: #3010

Signed-off-by: Thomas Graf <thomas@cilium.io>
  • Loading branch information
tgraf committed Mar 6, 2018
1 parent 47e01eb commit 42aaeed
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 36 deletions.
105 changes: 69 additions & 36 deletions pkg/kvstore/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
client "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
clientyaml "github.com/coreos/etcd/clientv3/yaml"
v3rpcErrors "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/hashicorp/go-version"
"github.com/sirupsen/logrus"
ctx "golang.org/x/net/context"
Expand Down Expand Up @@ -444,36 +445,42 @@ func (e *etcdClient) DeletePrefix(path string) error {
// Watch starts watching for changes in a prefix
func (e *etcdClient) Watch(w *Watcher) {
lastRev := int64(0)
localCache := watcherCache{}
listSignalSent := false

scopedLog := log.WithFields(logrus.Fields{
fieldWatcher: w,
fieldPrefix: w.prefix,
})

reList:
for {
res, err := e.client.Get(ctx.Background(), w.prefix, client.WithPrefix(),
client.WithRev(lastRev), client.WithSerializable())
if err != nil {
log.WithFields(logrus.Fields{
fieldRev: lastRev,
fieldPrefix: w.prefix,
fieldWatcher: w,
}).WithError(err).Warn("Unable to list keys before starting watcher")
scopedLog.WithField(fieldRev, lastRev).WithError(err).Warn("Unable to list keys before starting watcher")
continue
}

lastRev := res.Header.Revision

log.WithFields(logrus.Fields{
fieldRev: lastRev,
fieldWatcher: w,
}).Debugf("List response from etcd len=%d: %+v", res.Count, res)
scopedLog = scopedLog.WithField(fieldRev, lastRev)
scopedLog.Debugf("List response from etcd len=%d: %+v", res.Count, res)

if res.Count > 0 {
for _, key := range res.Kvs {
log.WithFields(logrus.Fields{
fieldRev: lastRev,
fieldWatcher: w,
}).Debugf("Emiting list result as %v event for %s=%v", EventTypeCreate, key.Key, key.Value)
t := EventTypeCreate
if localCache.Exists(key.Key) {
t = EventTypeModify
}

localCache.MarkInUse(key.Key)
scopedLog.Debugf("Emiting list result as %v event for %s=%v", t, key.Key, key.Value)

w.Events <- KeyValueEvent{
Key: string(key.Key),
Value: key.Value,
Typ: EventTypeCreate,
Typ: t,
}
}
}
Expand All @@ -483,15 +490,29 @@ func (e *etcdClient) Watch(w *Watcher) {
continue
}

w.Events <- KeyValueEvent{Typ: EventTypeListDone}
// Send out deletion events for all keys that were deleted
// between our last known revision and the latest revision
// received via Get
localCache.RemoveDeleted(func(k string) {
event := KeyValueEvent{
Key: k,
Typ: EventTypeDelete,
}

scopedLog.Debugf("Emiting EventTypeDelete event for %s", k)
w.Events <- event
})

// Only send the list signal once
if !listSignalSent {
w.Events <- KeyValueEvent{Typ: EventTypeListDone}
listSignalSent = true
}

recreateWatcher:
lastRev++

log.WithFields(logrus.Fields{
fieldRev: lastRev,
fieldWatcher: w,
}).Debugf("Starting to watch %s", w.prefix)
scopedLog.WithField(fieldRev, lastRev).Debug("Starting to watch a prefix")
etcdWatch := e.client.Watch(ctx.Background(), w.prefix,
client.WithPrefix(), client.WithRev(lastRev))
for {
Expand All @@ -505,38 +526,50 @@ func (e *etcdClient) Watch(w *Watcher) {
goto recreateWatcher
}

lastRev = r.Header.Revision
scopedLog := scopedLog.WithField(fieldRev, r.Header.Revision)

if err := r.Err(); err != nil {
log.WithFields(logrus.Fields{
fieldRev: lastRev,
fieldWatcher: w,
}).WithError(err).Warningf("etcd watcher received error")
continue
// We tried to watch on a compacted
// revision that may no longer exist,
// recreate the watcher and try to
// watch on the next possible revision
if err == v3rpcErrors.ErrCompacted {
scopedLog.WithError(err).Debug("Tried watching on compacted revision")
}

// Retrieve latest revision
lastRev = 0

// mark all local keys in state for
// deletion unless the upcoming GET
// marks them alive
localCache.MarkAllForDeletion()

continue reList
}

log.WithFields(logrus.Fields{
fieldRev: lastRev,
fieldWatcher: w,
}).Debugf("Received event from etcd: %+v", r)
lastRev = r.Header.Revision
scopedLog.Debugf("Received event from etcd: %+v", r)

for _, ev := range r.Events {
event := KeyValueEvent{
Key: string(ev.Kv.Key),
Value: ev.Kv.Value,
Typ: EventTypeModify,
}

if ev.Type == client.EventTypeDelete {
switch {
case ev.Type == client.EventTypeDelete:
event.Typ = EventTypeDelete
} else if ev.IsCreate() {
localCache.RemoveKey(ev.Kv.Key)
case ev.IsCreate():
event.Typ = EventTypeCreate
localCache.MarkInUse(ev.Kv.Key)
default:
event.Typ = EventTypeModify
localCache.MarkInUse(ev.Kv.Key)
}

log.WithFields(logrus.Fields{
fieldRev: lastRev,
fieldWatcher: w,
}).Debugf("Emiting %v event for %s=%v", event.Typ, event.Key, event.Value)
scopedLog.Debugf("Emiting %v event for %s=%v", event.Typ, event.Key, event.Value)

w.Events <- event
}
Expand Down
52 changes: 52 additions & 0 deletions pkg/kvstore/watcher_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2018 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kvstore

type watchState struct {
deletionMark bool
}

type watcherCache map[string]watchState

func (wc watcherCache) Exists(key []byte) bool {
if _, ok := wc[string(key)]; ok {
return true
}

return false
}

func (wc watcherCache) RemoveDeleted(f func(string)) {
for k, localKey := range wc {
if localKey.deletionMark {
f(k)
delete(wc, k)
}
}
}

func (wc watcherCache) MarkAllForDeletion() {
for k := range wc {
wc[k] = watchState{deletionMark: true}
}
}

func (wc watcherCache) MarkInUse(key []byte) {
wc[string(key)] = watchState{deletionMark: false}
}

func (wc watcherCache) RemoveKey(key []byte) {
delete(wc, string(key))
}

0 comments on commit 42aaeed

Please sign in to comment.