From 42aaeed0639c84de7f84f7c62736db11e2b6cc33 Mon Sep 17 00:00:00 2001 From: Thomas Graf Date: Mon, 5 Mar 2018 06:51:35 -0800 Subject: [PATCH] etcd: Treat etcd watcher errors by restarting the watcher etcd does not support watching on a compacted revision and will error out. Do a fresh get on the latest revision and restart the watcher. In order to continue maintaining the proper order of events, a local cache is introduced. The ListDone signal is only emitted once at the beginning. On ReList, deletion events are sent for keys which can no longer be found. Fixes: #3010 Signed-off-by: Thomas Graf --- pkg/kvstore/etcd.go | 105 +++++++++++++++++++++++------------ pkg/kvstore/watcher_cache.go | 52 +++++++++++++++++ 2 files changed, 121 insertions(+), 36 deletions(-) create mode 100644 pkg/kvstore/watcher_cache.go diff --git a/pkg/kvstore/etcd.go b/pkg/kvstore/etcd.go index fb5443172cc0..954ff78711d4 100644 --- a/pkg/kvstore/etcd.go +++ b/pkg/kvstore/etcd.go @@ -31,6 +31,7 @@ import ( client "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" clientyaml "github.com/coreos/etcd/clientv3/yaml" + v3rpcErrors "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/hashicorp/go-version" "github.com/sirupsen/logrus" ctx "golang.org/x/net/context" @@ -444,36 +445,42 @@ func (e *etcdClient) DeletePrefix(path string) error { // Watch starts watching for changes in a prefix func (e *etcdClient) Watch(w *Watcher) { lastRev := int64(0) + localCache := watcherCache{} + listSignalSent := false + scopedLog := log.WithFields(logrus.Fields{ + fieldWatcher: w, + fieldPrefix: w.prefix, + }) + +reList: for { res, err := e.client.Get(ctx.Background(), w.prefix, client.WithPrefix(), client.WithRev(lastRev), client.WithSerializable()) if err != nil { - log.WithFields(logrus.Fields{ - fieldRev: lastRev, - fieldPrefix: w.prefix, - fieldWatcher: w, - }).WithError(err).Warn("Unable to list keys before starting watcher") + scopedLog.WithField(fieldRev, lastRev).WithError(err).Warn("Unable to list keys before starting watcher") continue } lastRev := res.Header.Revision - log.WithFields(logrus.Fields{ - fieldRev: lastRev, - fieldWatcher: w, - }).Debugf("List response from etcd len=%d: %+v", res.Count, res) + scopedLog = scopedLog.WithField(fieldRev, lastRev) + scopedLog.Debugf("List response from etcd len=%d: %+v", res.Count, res) if res.Count > 0 { for _, key := range res.Kvs { - log.WithFields(logrus.Fields{ - fieldRev: lastRev, - fieldWatcher: w, - }).Debugf("Emiting list result as %v event for %s=%v", EventTypeCreate, key.Key, key.Value) + t := EventTypeCreate + if localCache.Exists(key.Key) { + t = EventTypeModify + } + + localCache.MarkInUse(key.Key) + scopedLog.Debugf("Emiting list result as %v event for %s=%v", t, key.Key, key.Value) + w.Events <- KeyValueEvent{ Key: string(key.Key), Value: key.Value, - Typ: EventTypeCreate, + Typ: t, } } } @@ -483,15 +490,29 @@ func (e *etcdClient) Watch(w *Watcher) { continue } - w.Events <- KeyValueEvent{Typ: EventTypeListDone} + // Send out deletion events for all keys that were deleted + // between our last known revision and the latest revision + // received via Get + localCache.RemoveDeleted(func(k string) { + event := KeyValueEvent{ + Key: k, + Typ: EventTypeDelete, + } + + scopedLog.Debugf("Emiting EventTypeDelete event for %s", k) + w.Events <- event + }) + + // Only send the list signal once + if !listSignalSent { + w.Events <- KeyValueEvent{Typ: EventTypeListDone} + listSignalSent = true + } recreateWatcher: lastRev++ - log.WithFields(logrus.Fields{ - fieldRev: lastRev, - fieldWatcher: w, - }).Debugf("Starting to watch %s", w.prefix) + scopedLog.WithField(fieldRev, lastRev).Debug("Starting to watch a prefix") etcdWatch := e.client.Watch(ctx.Background(), w.prefix, client.WithPrefix(), client.WithRev(lastRev)) for { @@ -505,38 +526,50 @@ func (e *etcdClient) Watch(w *Watcher) { goto recreateWatcher } - lastRev = r.Header.Revision + scopedLog := scopedLog.WithField(fieldRev, r.Header.Revision) if err := r.Err(); err != nil { - log.WithFields(logrus.Fields{ - fieldRev: lastRev, - fieldWatcher: w, - }).WithError(err).Warningf("etcd watcher received error") - continue + // We tried to watch on a compacted + // revision that may no longer exist, + // recreate the watcher and try to + // watch on the next possible revision + if err == v3rpcErrors.ErrCompacted { + scopedLog.WithError(err).Debug("Tried watching on compacted revision") + } + + // Retrieve latest revision + lastRev = 0 + + // mark all local keys in state for + // deletion unless the upcoming GET + // marks them alive + localCache.MarkAllForDeletion() + + continue reList } - log.WithFields(logrus.Fields{ - fieldRev: lastRev, - fieldWatcher: w, - }).Debugf("Received event from etcd: %+v", r) + lastRev = r.Header.Revision + scopedLog.Debugf("Received event from etcd: %+v", r) for _, ev := range r.Events { event := KeyValueEvent{ Key: string(ev.Kv.Key), Value: ev.Kv.Value, - Typ: EventTypeModify, } - if ev.Type == client.EventTypeDelete { + switch { + case ev.Type == client.EventTypeDelete: event.Typ = EventTypeDelete - } else if ev.IsCreate() { + localCache.RemoveKey(ev.Kv.Key) + case ev.IsCreate(): event.Typ = EventTypeCreate + localCache.MarkInUse(ev.Kv.Key) + default: + event.Typ = EventTypeModify + localCache.MarkInUse(ev.Kv.Key) } - log.WithFields(logrus.Fields{ - fieldRev: lastRev, - fieldWatcher: w, - }).Debugf("Emiting %v event for %s=%v", event.Typ, event.Key, event.Value) + scopedLog.Debugf("Emiting %v event for %s=%v", event.Typ, event.Key, event.Value) w.Events <- event } diff --git a/pkg/kvstore/watcher_cache.go b/pkg/kvstore/watcher_cache.go new file mode 100644 index 000000000000..317a19224571 --- /dev/null +++ b/pkg/kvstore/watcher_cache.go @@ -0,0 +1,52 @@ +// Copyright 2018 Authors of Cilium +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kvstore + +type watchState struct { + deletionMark bool +} + +type watcherCache map[string]watchState + +func (wc watcherCache) Exists(key []byte) bool { + if _, ok := wc[string(key)]; ok { + return true + } + + return false +} + +func (wc watcherCache) RemoveDeleted(f func(string)) { + for k, localKey := range wc { + if localKey.deletionMark { + f(k) + delete(wc, k) + } + } +} + +func (wc watcherCache) MarkAllForDeletion() { + for k := range wc { + wc[k] = watchState{deletionMark: true} + } +} + +func (wc watcherCache) MarkInUse(key []byte) { + wc[string(key)] = watchState{deletionMark: false} +} + +func (wc watcherCache) RemoveKey(key []byte) { + delete(wc, string(key)) +}