Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
Revert "add stopChan to storeWatcher.Run()"
Browse files Browse the repository at this point in the history
This reverts commit d90ccfc.
  • Loading branch information
Joseph Rodriguez committed Aug 15, 2014
1 parent d90ccfc commit ce79297
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 38 deletions.
8 changes: 1 addition & 7 deletions store/app_service_store_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ var _ = Describe("AppServiceStoreIntegration", func() {
incomingChan chan appservice.AppServices
outAddChan <-chan appservice.AppService
outRemoveChan <-chan appservice.AppService
stopChan chan struct{}
)

BeforeEach(func() {
Expand All @@ -23,17 +22,12 @@ var _ = Describe("AppServiceStoreIntegration", func() {
c := cache.NewAppServiceCache()
var watcher *AppServiceStoreWatcher
watcher, outAddChan, outRemoveChan = NewAppServiceStoreWatcher(adapter, c)
stopChan = make(chan struct{})
go watcher.Run(stopChan)
go watcher.Run()

store := NewAppServiceStore(adapter, watcher)
go store.Run(incomingChan)
})

AfterEach(func() {
close(stopChan)
})

It("should receive, store, and republish AppServices", func(done Done) {
appServices := appservice.AppServices{AppId: "12345", Urls: []string{"syslog://foo"}}
incomingChan <- appServices
Expand Down
15 changes: 7 additions & 8 deletions store/app_service_store_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (w *AppServiceStoreWatcher) Exists(appService appservice.AppService) bool {
return w.cache.Exists(appService)
}

func (w *AppServiceStoreWatcher) Run(outerStopChan <-chan struct{}) {
func (w *AppServiceStoreWatcher) Run() {
defer func() {
close(w.outAddChan)
close(w.outRemoveChan)
Expand All @@ -70,17 +70,17 @@ func (w *AppServiceStoreWatcher) Run(outerStopChan <-chan struct{}) {
InnerLoop:
for {
select {
case err, ok := <-errChan:
if !ok {
break InnerLoop
case err := <-errChan:
if err == nil {
return
}
cfcomponent.Logger.Errorf("AppStoreWatcher: Got error while waiting for ETCD events: %s", err.Error())
close(stopChan)
break InnerLoop

case event, ok := <-events:
if !ok {
break InnerLoop
return
}

cfcomponent.Logger.Debugf("AppStoreWatcher: Got an event from store %s", event.Type)
Expand All @@ -96,12 +96,11 @@ func (w *AppServiceStoreWatcher) Run(outerStopChan <-chan struct{}) {
case storeadapter.ExpireEvent:
w.deleteEvent(*(event.PrevNode))
}

case <-outerStopChan:
return
}
}

}

}

func (w *AppServiceStoreWatcher) registerExistingServicesFromStore() {
Expand Down
20 changes: 4 additions & 16 deletions store/app_service_store_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ var _ = Describe("AppServiceStoreWatcher", func() {
var adapter storeadapter.StoreAdapter
var outAddChan <-chan appservice.AppService
var outRemoveChan <-chan appservice.AppService
var stopChan chan struct{}

var app1Service1 appservice.AppService
var app1Service2 appservice.AppService
Expand All @@ -49,7 +48,6 @@ var _ = Describe("AppServiceStoreWatcher", func() {

c := cache.NewAppServiceCache()
watcher, outAddChan, outRemoveChan = NewAppServiceStoreWatcher(adapter, c)
stopChan = make(chan struct{})
})

AfterEach(func() {
Expand All @@ -60,11 +58,10 @@ var _ = Describe("AppServiceStoreWatcher", func() {

Describe("Shutdown", func() {
It("should close the outgoing channels", func() {
go watcher.Run(stopChan)
go watcher.Run()
time.Sleep(500 * time.Millisecond)
adapter.Disconnect()

close(stopChan)
Eventually(outRemoveChan).Should(BeClosed())
Eventually(outAddChan).Should(BeClosed())
})
Expand All @@ -73,8 +70,7 @@ var _ = Describe("AppServiceStoreWatcher", func() {
Describe("Loading watcher state on startup", func() {
Context("when the store is empty", func() {
It("should not send anything on the output channels", func() {
go watcher.Run(stopChan)
defer close(stopChan)
go watcher.Run()

Consistently(outAddChan).Should(BeEmpty())
Consistently(outRemoveChan).Should(BeEmpty())
Expand All @@ -89,8 +85,7 @@ var _ = Describe("AppServiceStoreWatcher", func() {
})

It("should send all the AppServices on the output add channel", func(done Done) {
go watcher.Run(stopChan)
defer close(stopChan)
go watcher.Run()

appServices := drainOutgoingChannel(outAddChan, 3)

Expand All @@ -106,23 +101,16 @@ var _ = Describe("AppServiceStoreWatcher", func() {
})

Describe("when the store has data and watcher is bootstrapped", func() {
var stopChan chan struct{}

BeforeEach(func(done Done) {
adapter.Create(buildNode(app1Service1))
adapter.Create(buildNode(app1Service2))
adapter.Create(buildNode(app2Service1))

stopChan = make(chan struct{})
go watcher.Run(stopChan)
go watcher.Run()
drainOutgoingChannel(outAddChan, 3)
close(done)
})

AfterEach(func() {
close(stopChan)
})

It("does not send updates when the data has already been processed", func(done Done) {
adapter.Create(buildNode(app1Service1))
adapter.Create(buildNode(app1Service2))
Expand Down
8 changes: 1 addition & 7 deletions store/app_service_store_watcher_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,12 @@ import (
var _ = Describe("AppServiceStoreWatcherUnit", func() {
Context("when there is an error", func() {
var adapter *FakeAdapter
var stopChan chan struct{}

BeforeEach(func() {
adapter = &FakeAdapter{}
watcher, _, _ := NewAppServiceStoreWatcher(adapter, cache.NewAppServiceCache())

stopChan = make(chan struct{})
go watcher.Run(stopChan)
})

AfterEach(func() {
close(stopChan)
go watcher.Run()
})

It("calls watch again", func() {
Expand Down

0 comments on commit ce79297

Please sign in to comment.