Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
Restart the watcher when there is an error sometimes
Browse files Browse the repository at this point in the history
[#76989538]

Signed-off-by: Johannes Tuchscherer <jtuchscherer@pivotallabs.com>
  • Loading branch information
John Tuley committed Aug 15, 2014
1 parent 7e6ea91 commit 0a5bce3
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 50 deletions.
36 changes: 0 additions & 36 deletions store/app_service_store_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,10 @@ import (
"github.com/cloudfoundry/loggregatorlib/appservice"
. "github.com/cloudfoundry/loggregatorlib/store"
"github.com/cloudfoundry/loggregatorlib/store/cache"
"github.com/cloudfoundry/storeadapter"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

type FakeAdapter struct {
DeleteCount int
}

func (adapter *FakeAdapter) Connect() error { return nil }
func (adapter *FakeAdapter) Create(storeadapter.StoreNode) error { return nil }
func (adapter *FakeAdapter) Update(storeadapter.StoreNode) error { return nil }
func (adapter *FakeAdapter) CompareAndSwap(storeadapter.StoreNode, storeadapter.StoreNode) error {
return nil
}
func (adapter *FakeAdapter) CompareAndSwapByIndex(uint64, storeadapter.StoreNode) error {
return nil
}

func (adapter *FakeAdapter) SetMulti(nodes []storeadapter.StoreNode) error { return nil }
func (adapter *FakeAdapter) Get(key string) (storeadapter.StoreNode, error) {
return storeadapter.StoreNode{}, nil
}
func (adapter *FakeAdapter) ListRecursively(key string) (storeadapter.StoreNode, error) {
return storeadapter.StoreNode{}, nil
}
func (adapter *FakeAdapter) Delete(keys ...string) error {
adapter.DeleteCount++
return nil
}
func (adapter *FakeAdapter) CompareAndDelete(storeadapter.StoreNode) error { return nil }
func (adapter *FakeAdapter) UpdateDirTTL(key string, ttl uint64) error { return nil }
func (adapter *FakeAdapter) Watch(key string) (events <-chan storeadapter.WatchEvent, stop chan<- bool, errors <-chan error) {
return nil, nil, nil
}
func (adapter *FakeAdapter) Disconnect() error { return nil }
func (adapter *FakeAdapter) MaintainNode(storeNode storeadapter.StoreNode) (lostNode <-chan bool, releaseNode chan chan bool, err error) {
return nil, nil, nil
}

var _ = Describe("AppServiceUnit", func() {
Context("when the store has data", func() {
var store *AppServiceStore
Expand Down
48 changes: 34 additions & 14 deletions store/app_service_store_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,43 @@ func (w *AppServiceStoreWatcher) Run() {

w.registerExistingServicesFromStore()

events, _, _ := w.adapter.Watch("/loggregator/services")

for event := range events {
cfcomponent.Logger.Debugf("AppStoreWatcher: Got an event from store %s", event.Type)
switch event.Type {
case storeadapter.CreateEvent, storeadapter.UpdateEvent:
if event.Node.Dir || len(event.Node.Value) == 0 {
// we can ignore any directory nodes (app or other namespace additions)
continue
for {
events, stopChan, errChan := w.adapter.Watch("/loggregator/services")

InnerLoop:
for {
select {
case err := <-errChan:
if err == nil {
return
}
cfcomponent.Logger.Errorf("AppStoreWatcher: Got error while waiting for ETCD events: %s", err.Error())
close(stopChan)
break InnerLoop

case event, ok := <-events:
if !ok {
return
}

cfcomponent.Logger.Debugf("AppStoreWatcher: Got an event from store %s", event.Type)
switch event.Type {
case storeadapter.CreateEvent, storeadapter.UpdateEvent:
if event.Node.Dir || len(event.Node.Value) == 0 {
// we can ignore any directory nodes (app or other namespace additions)
continue
}
w.Add(appServiceFromStoreNode(*(event.Node)))
case storeadapter.DeleteEvent:
w.deleteEvent(*(event.PrevNode))
case storeadapter.ExpireEvent:
w.deleteEvent(*(event.PrevNode))
}
}
w.Add(appServiceFromStoreNode(*(event.Node)))
case storeadapter.DeleteEvent:
w.deleteEvent(*(event.PrevNode))
case storeadapter.ExpireEvent:
w.deleteEvent(*(event.PrevNode))
}

}

}

func (w *AppServiceStoreWatcher) registerExistingServicesFromStore() {
Expand Down
30 changes: 30 additions & 0 deletions store/app_service_store_watcher_unit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package store_test

import (
"errors"
. "github.com/cloudfoundry/loggregatorlib/store"
"github.com/cloudfoundry/loggregatorlib/store/cache"
"github.com/cloudfoundry/storeadapter"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"sync"
)

var _ = Describe("AppServiceStoreWatcherUnit", func() {
Context("when there is an error", func() {
var adapter *FakeAdapter

BeforeEach(func() {
adapter = &FakeAdapter{}
watcher, _, _ := NewAppServiceStoreWatcher(adapter, cache.NewAppServiceCache())

go watcher.Run()
})

It("calls watch again", func() {
Eventually(adapter.GetWatchCounter).Should(Equal(1))
adapter.WatchErrChannel <- errors.New("Haha")
Eventually(adapter.GetWatchCounter).Should(Equal(2))
})
})
})
49 changes: 49 additions & 0 deletions store/store_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,52 @@ func buildNode(appService appservice.AppService) storeadapter.StoreNode {
Value: []byte(appService.Url),
}
}

type FakeAdapter struct {
DeleteCount int
WatchErrChannel chan error
WatchCounter int
sync.Mutex
}

func (adapter *FakeAdapter) GetWatchCounter() int {
adapter.Lock()
defer adapter.Unlock()
return adapter.WatchCounter
}

func (adapter *FakeAdapter) Connect() error { return nil }
func (adapter *FakeAdapter) Create(storeadapter.StoreNode) error { return nil }
func (adapter *FakeAdapter) Update(storeadapter.StoreNode) error { return nil }
func (adapter *FakeAdapter) CompareAndSwap(storeadapter.StoreNode, storeadapter.StoreNode) error {
return nil
}
func (adapter *FakeAdapter) CompareAndSwapByIndex(uint64, storeadapter.StoreNode) error {
return nil
}

func (adapter *FakeAdapter) SetMulti(nodes []storeadapter.StoreNode) error { return nil }
func (adapter *FakeAdapter) Get(key string) (storeadapter.StoreNode, error) {
return storeadapter.StoreNode{}, nil
}
func (adapter *FakeAdapter) ListRecursively(key string) (storeadapter.StoreNode, error) {
return storeadapter.StoreNode{}, nil
}
func (adapter *FakeAdapter) Delete(keys ...string) error {
adapter.DeleteCount++
return nil
}
func (adapter *FakeAdapter) CompareAndDelete(storeadapter.StoreNode) error { return nil }
func (adapter *FakeAdapter) UpdateDirTTL(key string, ttl uint64) error { return nil }
func (adapter *FakeAdapter) Watch(key string) (events <-chan storeadapter.WatchEvent, stop chan<- bool, errors <-chan error) {
adapter.Lock()
defer adapter.Unlock()
adapter.WatchCounter++
adapter.WatchErrChannel = make(chan error, 1)

return nil, make(chan bool), adapter.WatchErrChannel
}
func (adapter *FakeAdapter) Disconnect() error { return nil }
func (adapter *FakeAdapter) MaintainNode(storeNode storeadapter.StoreNode) (lostNode <-chan bool, releaseNode chan chan bool, err error) {
return nil, nil, nil
}

0 comments on commit 0a5bce3

Please sign in to comment.