Skip to content
This repository has been archived by the owner on Oct 12, 2021. It is now read-only.

Commit

Permalink
Merge pull request #50 from WuLonghui/sroreadapter-reuse
Browse files Browse the repository at this point in the history
Reuse store adapter for Doppler
  • Loading branch information
jmtuley committed Apr 16, 2015
2 parents 0b5714c + 9002f61 commit ba7935f
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 30 deletions.
11 changes: 1 addition & 10 deletions src/doppler/doppler.go
Expand Up @@ -14,15 +14,13 @@ import (
"github.com/cloudfoundry/dropsonde/events"
"github.com/cloudfoundry/dropsonde/signature"
"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/gunk/workpool"
"github.com/cloudfoundry/loggregatorlib/agentlistener"
"github.com/cloudfoundry/loggregatorlib/appservice"
"github.com/cloudfoundry/loggregatorlib/cfcomponent"
"github.com/cloudfoundry/loggregatorlib/cfcomponent/instrumentation"
"github.com/cloudfoundry/loggregatorlib/store"
"github.com/cloudfoundry/loggregatorlib/store/cache"
"github.com/cloudfoundry/storeadapter"
"github.com/cloudfoundry/storeadapter/etcdstoreadapter"
)

type Doppler struct {
Expand All @@ -49,13 +47,10 @@ type Doppler struct {
sync.WaitGroup
}

func New(host string, config *config.Config, logger *gosteno.Logger, dropsondeOrigin string) *Doppler {
func New(host string, config *config.Config, logger *gosteno.Logger, storeAdapter storeadapter.StoreAdapter, dropsondeOrigin string) *Doppler {
cfcomponent.Logger = logger
keepAliveInterval := 30 * time.Second

workPool := workpool.NewWorkPool(config.EtcdMaxConcurrentRequests)
storeAdapter := etcdstoreadapter.NewETCDStoreAdapter(config.EtcdUrls, workPool)
storeAdapter.Connect()
appStoreCache := cache.NewAppServiceCache()
appStoreWatcher, newAppServiceChan, deletedAppServiceChan := store.NewAppServiceStoreWatcher(storeAdapter, appStoreCache)

Expand Down Expand Up @@ -93,10 +88,6 @@ func (doppler *Doppler) Start() {
doppler.errChan = make(chan error)
doppler.Unlock()

err := doppler.storeAdapter.Connect()
if err != nil {
panic(err)
}
doppler.Add(7)

go func() {
Expand Down
3 changes: 2 additions & 1 deletion src/doppler/doppler_test.go
Expand Up @@ -22,7 +22,8 @@ var _ = Describe("Doppler Server", func() {
var dopplerInstance *doppler.Doppler

BeforeEach(func() {
dopplerInstance = doppler.New("127.0.0.1", dopplerConfig, loggertesthelper.Logger(), "dropsondeOrigin")
storeAdapter := doppler.NewStoreAdapter(dopplerConfig.EtcdUrls, dopplerConfig.EtcdMaxConcurrentRequests)
dopplerInstance = doppler.New("127.0.0.1", dopplerConfig, loggertesthelper.Logger(), storeAdapter, "dropsondeOrigin")
go dopplerInstance.Start()
time.Sleep(10 * time.Millisecond)
})
Expand Down
23 changes: 16 additions & 7 deletions src/doppler/main.go
Expand Up @@ -30,6 +30,8 @@ var (
configFile = flag.String("config", "config/doppler.json", "Location of the doppler config json file")
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
memprofile = flag.String("memprofile", "", "write memory profile to this file")

storeAdapter storeadapter.StoreAdapter
)

type DopplerServerHealthMonitor struct {
Expand All @@ -39,10 +41,15 @@ func (hm DopplerServerHealthMonitor) Ok() bool {
return true
}

var StoreAdapterProvider = func(urls []string, concurrentRequests int) storeadapter.StoreAdapter {
workPool := workpool.NewWorkPool(concurrentRequests)
func SetStoreAdapter(adapter storeadapter.StoreAdapter) {
storeAdapter = adapter
}

return etcdstoreadapter.NewETCDStoreAdapter(urls, workPool)
func NewStoreAdapter(urls []string, concurrentRequests int) storeadapter.StoreAdapter {
workPool := workpool.NewWorkPool(concurrentRequests)
etcdStoreAdapter := etcdstoreadapter.NewETCDStoreAdapter(urls, workPool)
etcdStoreAdapter.Connect()
return etcdStoreAdapter
}

func main() {
Expand Down Expand Up @@ -100,7 +107,8 @@ func main() {
panic(err)
}

doppler := New(localIp, conf, logger, "doppler")
storeAdapter = NewStoreAdapter(conf.EtcdUrls, conf.EtcdMaxConcurrentRequests)
doppler := New(localIp, conf, logger, storeAdapter, "doppler")

cfc, err := cfcomponent.NewComponent(
logger,
Expand Down Expand Up @@ -163,11 +171,12 @@ func StartHeartbeats(localIp string, ttl time.Duration, config *config.Config, l
return
}

adapter := StoreAdapterProvider(config.EtcdUrls, config.EtcdMaxConcurrentRequests)
adapter.Connect()
if storeAdapter == nil {
panic("store adapter is nil")
}

logger.Debugf("Starting Health Status Updates to Store: /healthstatus/doppler/%s/%s/%d", config.Zone, config.JobName, config.Index)
status, stopChan, err := adapter.MaintainNode(storeadapter.StoreNode{
status, stopChan, err := storeAdapter.MaintainNode(storeadapter.StoreNode{
Key: fmt.Sprintf("/healthstatus/doppler/%s/%s/%d", config.Zone, config.JobName, config.Index),
Value: []byte(localIp),
TTL: uint64(ttl.Seconds()),
Expand Down
2 changes: 2 additions & 0 deletions src/doppler/main_integration_test.go
Expand Up @@ -40,6 +40,8 @@ var _ = Describe("Etcd Integration tests", func() {
Zone: "z1",
ContainerMetricTTLSeconds: 120,
}

main.SetStoreAdapter(main.NewStoreAdapter(conf.EtcdUrls, conf.EtcdMaxConcurrentRequests))
})

AfterEach(func() {
Expand Down
39 changes: 27 additions & 12 deletions src/doppler/main_test.go
Expand Up @@ -7,7 +7,6 @@ import (
"doppler"

"github.com/cloudfoundry/loggregatorlib/loggertesthelper"
"github.com/cloudfoundry/storeadapter"
"github.com/cloudfoundry/storeadapter/fakestoreadapter"
"github.com/pivotal-golang/localip"

Expand All @@ -17,7 +16,6 @@ import (
. "github.com/onsi/gomega"
)

//Test ParseConfig
var _ = Describe("Main", func() {

Describe("ParseConfig", func() {
Expand Down Expand Up @@ -79,20 +77,17 @@ var _ = Describe("Main", func() {

Describe("StartHeartbeats", func() {
var adapter *fakestoreadapter.FakeStoreAdapter
var oldProvider func([]string, int) storeadapter.StoreAdapter

BeforeEach(func() {
adapter = fakestoreadapter.New()
oldProvider = main.StoreAdapterProvider
main.StoreAdapterProvider = func([]string, int) storeadapter.StoreAdapter {
return adapter
}
main.SetStoreAdapter(adapter)
})

AfterEach(func() {
main.StoreAdapterProvider = oldProvider
main.SetStoreAdapter(nil)
})

Context("with a valid ETCD conig", func() {
Context("when store adapter is nil", func() {
var conf config.Config
var localIp string

Expand All @@ -106,11 +101,31 @@ var _ = Describe("Main", func() {
Zone: "z1",
DropsondeIncomingMessagesPort: 1234,
}

main.SetStoreAdapter(nil)
})

It("connects to etcd", func() {
main.StartHeartbeats(localIp, time.Second, &conf, loggertesthelper.Logger())
Expect(adapter.DidConnect).To(BeTrue())
It("should panic", func() {
Expect(func() {
main.StartHeartbeats(localIp, time.Second, &conf, loggertesthelper.Logger())
}).Should(Panic())
})
})

Context("with a valid ETCD conig", func() {
var conf config.Config
var localIp string

BeforeEach(func() {
localIp, _ = localip.LocalIP()
conf = config.Config{
JobName: "doppler_z1",
Index: 0,
EtcdMaxConcurrentRequests: 10,
EtcdUrls: []string{"test:123", "test:456"},
Zone: "z1",
DropsondeIncomingMessagesPort: 1234,
}
})

It("sends a heartbeat to etcd", func() {
Expand Down

0 comments on commit ba7935f

Please sign in to comment.