From 0a5bce3eb4d0e834d0403cf9ebe928e06d838f53 Mon Sep 17 00:00:00 2001 From: John Tuley Date: Thu, 14 Aug 2014 18:47:14 -0600 Subject: [PATCH] Restart the watcher when there is an error sometimes [#76989538] Signed-off-by: Johannes Tuchscherer --- store/app_service_store_unit_test.go | 36 -------------- store/app_service_store_watcher.go | 48 +++++++++++++------ store/app_service_store_watcher_unit_test.go | 30 ++++++++++++ store/store_suite_test.go | 49 ++++++++++++++++++++ 4 files changed, 113 insertions(+), 50 deletions(-) create mode 100644 store/app_service_store_watcher_unit_test.go diff --git a/store/app_service_store_unit_test.go b/store/app_service_store_unit_test.go index 121d115..73917c3 100644 --- a/store/app_service_store_unit_test.go +++ b/store/app_service_store_unit_test.go @@ -4,46 +4,10 @@ import ( "github.com/cloudfoundry/loggregatorlib/appservice" . "github.com/cloudfoundry/loggregatorlib/store" "github.com/cloudfoundry/loggregatorlib/store/cache" - "github.com/cloudfoundry/storeadapter" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) -type FakeAdapter struct { - DeleteCount int -} - -func (adapter *FakeAdapter) Connect() error { return nil } -func (adapter *FakeAdapter) Create(storeadapter.StoreNode) error { return nil } -func (adapter *FakeAdapter) Update(storeadapter.StoreNode) error { return nil } -func (adapter *FakeAdapter) CompareAndSwap(storeadapter.StoreNode, storeadapter.StoreNode) error { - return nil -} -func (adapter *FakeAdapter) CompareAndSwapByIndex(uint64, storeadapter.StoreNode) error { - return nil -} - -func (adapter *FakeAdapter) SetMulti(nodes []storeadapter.StoreNode) error { return nil } -func (adapter *FakeAdapter) Get(key string) (storeadapter.StoreNode, error) { - return storeadapter.StoreNode{}, nil -} -func (adapter *FakeAdapter) ListRecursively(key string) (storeadapter.StoreNode, error) { - return storeadapter.StoreNode{}, nil -} -func (adapter *FakeAdapter) Delete(keys ...string) error { - adapter.DeleteCount++ - return nil -} -func (adapter *FakeAdapter) CompareAndDelete(storeadapter.StoreNode) error { return nil } -func (adapter *FakeAdapter) UpdateDirTTL(key string, ttl uint64) error { return nil } -func (adapter *FakeAdapter) Watch(key string) (events <-chan storeadapter.WatchEvent, stop chan<- bool, errors <-chan error) { - return nil, nil, nil -} -func (adapter *FakeAdapter) Disconnect() error { return nil } -func (adapter *FakeAdapter) MaintainNode(storeNode storeadapter.StoreNode) (lostNode <-chan bool, releaseNode chan chan bool, err error) { - return nil, nil, nil -} - var _ = Describe("AppServiceUnit", func() { Context("when the store has data", func() { var store *AppServiceStore diff --git a/store/app_service_store_watcher.go b/store/app_service_store_watcher.go index 656deb4..6ed8c65 100644 --- a/store/app_service_store_watcher.go +++ b/store/app_service_store_watcher.go @@ -64,23 +64,43 @@ func (w *AppServiceStoreWatcher) Run() { w.registerExistingServicesFromStore() - events, _, _ := w.adapter.Watch("/loggregator/services") - - for event := range events { - cfcomponent.Logger.Debugf("AppStoreWatcher: Got an event from store %s", event.Type) - switch event.Type { - case storeadapter.CreateEvent, storeadapter.UpdateEvent: - if event.Node.Dir || len(event.Node.Value) == 0 { - // we can ignore any directory nodes (app or other namespace additions) - continue + for { + events, stopChan, errChan := w.adapter.Watch("/loggregator/services") + + InnerLoop: + for { + select { + case err := <-errChan: + if err == nil { + return + } + cfcomponent.Logger.Errorf("AppStoreWatcher: Got error while waiting for ETCD events: %s", err.Error()) + close(stopChan) + break InnerLoop + + case event, ok := <-events: + if !ok { + return + } + + cfcomponent.Logger.Debugf("AppStoreWatcher: Got an event from store %s", event.Type) + switch event.Type { + case storeadapter.CreateEvent, storeadapter.UpdateEvent: + if event.Node.Dir || len(event.Node.Value) == 0 { + // we can ignore any directory nodes (app or other namespace additions) + continue + } + w.Add(appServiceFromStoreNode(*(event.Node))) + case storeadapter.DeleteEvent: + w.deleteEvent(*(event.PrevNode)) + case storeadapter.ExpireEvent: + w.deleteEvent(*(event.PrevNode)) + } } - w.Add(appServiceFromStoreNode(*(event.Node))) - case storeadapter.DeleteEvent: - w.deleteEvent(*(event.PrevNode)) - case storeadapter.ExpireEvent: - w.deleteEvent(*(event.PrevNode)) } + } + } func (w *AppServiceStoreWatcher) registerExistingServicesFromStore() { diff --git a/store/app_service_store_watcher_unit_test.go b/store/app_service_store_watcher_unit_test.go new file mode 100644 index 0000000..c73a8c2 --- /dev/null +++ b/store/app_service_store_watcher_unit_test.go @@ -0,0 +1,30 @@ +package store_test + +import ( + "errors" + . "github.com/cloudfoundry/loggregatorlib/store" + "github.com/cloudfoundry/loggregatorlib/store/cache" + "github.com/cloudfoundry/storeadapter" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "sync" +) + +var _ = Describe("AppServiceStoreWatcherUnit", func() { + Context("when there is an error", func() { + var adapter *FakeAdapter + + BeforeEach(func() { + adapter = &FakeAdapter{} + watcher, _, _ := NewAppServiceStoreWatcher(adapter, cache.NewAppServiceCache()) + + go watcher.Run() + }) + + It("calls watch again", func() { + Eventually(adapter.GetWatchCounter).Should(Equal(1)) + adapter.WatchErrChannel <- errors.New("Haha") + Eventually(adapter.GetWatchCounter).Should(Equal(2)) + }) + }) +}) diff --git a/store/store_suite_test.go b/store/store_suite_test.go index 4294236..889b1fd 100644 --- a/store/store_suite_test.go +++ b/store/store_suite_test.go @@ -66,3 +66,52 @@ func buildNode(appService appservice.AppService) storeadapter.StoreNode { Value: []byte(appService.Url), } } + +type FakeAdapter struct { + DeleteCount int + WatchErrChannel chan error + WatchCounter int + sync.Mutex +} + +func (adapter *FakeAdapter) GetWatchCounter() int { + adapter.Lock() + defer adapter.Unlock() + return adapter.WatchCounter +} + +func (adapter *FakeAdapter) Connect() error { return nil } +func (adapter *FakeAdapter) Create(storeadapter.StoreNode) error { return nil } +func (adapter *FakeAdapter) Update(storeadapter.StoreNode) error { return nil } +func (adapter *FakeAdapter) CompareAndSwap(storeadapter.StoreNode, storeadapter.StoreNode) error { + return nil +} +func (adapter *FakeAdapter) CompareAndSwapByIndex(uint64, storeadapter.StoreNode) error { + return nil +} + +func (adapter *FakeAdapter) SetMulti(nodes []storeadapter.StoreNode) error { return nil } +func (adapter *FakeAdapter) Get(key string) (storeadapter.StoreNode, error) { + return storeadapter.StoreNode{}, nil +} +func (adapter *FakeAdapter) ListRecursively(key string) (storeadapter.StoreNode, error) { + return storeadapter.StoreNode{}, nil +} +func (adapter *FakeAdapter) Delete(keys ...string) error { + adapter.DeleteCount++ + return nil +} +func (adapter *FakeAdapter) CompareAndDelete(storeadapter.StoreNode) error { return nil } +func (adapter *FakeAdapter) UpdateDirTTL(key string, ttl uint64) error { return nil } +func (adapter *FakeAdapter) Watch(key string) (events <-chan storeadapter.WatchEvent, stop chan<- bool, errors <-chan error) { + adapter.Lock() + defer adapter.Unlock() + adapter.WatchCounter++ + adapter.WatchErrChannel = make(chan error, 1) + + return nil, make(chan bool), adapter.WatchErrChannel +} +func (adapter *FakeAdapter) Disconnect() error { return nil } +func (adapter *FakeAdapter) MaintainNode(storeNode storeadapter.StoreNode) (lostNode <-chan bool, releaseNode chan chan bool, err error) { + return nil, nil, nil +}