Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
Add servicediscovery package with ServerAddressList
Browse files Browse the repository at this point in the history
[#75365570]

Signed-off-by: John Tuley <jtuley@pivotal.io>
  • Loading branch information
Mike Gehard committed Sep 3, 2014
1 parent b8f5328 commit 24462c4
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 0 deletions.
95 changes: 95 additions & 0 deletions servicediscovery/servicediscovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package servicediscovery

import (
"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/storeadapter"
"sync"
"time"
)

type ServerAddressList interface {
Run(updateInterval time.Duration)
Stop()
GetAddresses() []string
}

type serverAddressList struct {
addresses []string
storeAdapter storeadapter.StoreAdapter
stopChan chan struct{}
storeKeyPrefix string
logger *gosteno.Logger
sync.RWMutex
}

func NewServerAddressList(storeAdapter storeadapter.StoreAdapter, storeKeyPrefix string, logger *gosteno.Logger) ServerAddressList {
return &serverAddressList{
storeAdapter: storeAdapter,
addresses: []string{},
stopChan: make(chan struct{}),
storeKeyPrefix: storeKeyPrefix,
logger: logger,
}
}

func (list *serverAddressList) Run(updateInterval time.Duration) {
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()

for {
select {
case <-list.stopChan:
return
case <-ticker.C:
node, err := list.storeAdapter.ListRecursively(list.storeKeyPrefix)

if err == storeadapter.ErrorKeyNotFound {
list.logger.Debugf("ServerAddressList.Run: Unable to recursively find keys with prefix %s", list.storeKeyPrefix)
continue
}

if err != nil {
panic(err) //FIXME: understand error modes and recovery cases better
}

leaves := leafNodes(node)

addresses := []string{}

for _, leaf := range leaves {
addresses = append(addresses, string(leaf.Value))
}

list.Lock()
list.addresses = addresses
list.Unlock()
}
}
}

func (list *serverAddressList) Stop() {
close(list.stopChan)
}

func (list *serverAddressList) GetAddresses() []string {
list.RLock()
defer list.RUnlock()

return list.addresses
}

func leafNodes(root storeadapter.StoreNode) []storeadapter.StoreNode {
if !root.Dir {
if len(root.Value) == 0 {
return []storeadapter.StoreNode{}
} else {
return []storeadapter.StoreNode{root}
}
}

leaves := []storeadapter.StoreNode{}
for _, node := range root.ChildNodes {
leaves = append(leaves, leafNodes(node)...)
}
return leaves
}
13 changes: 13 additions & 0 deletions servicediscovery/servicediscovery_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package servicediscovery_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"testing"
)

func TestServicediscovery(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "ServiceDiscovery Suite")
}
145 changes: 145 additions & 0 deletions servicediscovery/servicediscovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package servicediscovery_test

import (
"github.com/cloudfoundry/loggregatorlib/loggertesthelper"
"github.com/cloudfoundry/loggregatorlib/servicediscovery"
"github.com/cloudfoundry/storeadapter"
"github.com/cloudfoundry/storeadapter/fakestoreadapter"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("ServiceDiscovery", func() {
var (
storeAdapter *fakestoreadapter.FakeStoreAdapter
list servicediscovery.ServerAddressList
)

BeforeEach(func() {
storeAdapter = fakestoreadapter.New()
list = servicediscovery.NewServerAddressList(storeAdapter, "/healthstatus/loggregator", loggertesthelper.Logger())
})

AfterEach(func() {
list.Stop()
})

It("gets the state of the world at startup", func() {
node := storeadapter.StoreNode{
Key: "/healthstatus/loggregator/z1/loggregator_z1",
Value: []byte("10.0.0.1"),
}
storeAdapter.Create(node)

go list.Run(1 * time.Millisecond)

expectedAddresses := []string{"10.0.0.1"}

Eventually(list.GetAddresses).Should(ConsistOf(expectedAddresses))
})

It("adds servers that appear later", func() {
go list.Run(1 * time.Millisecond)

Expect(list.GetAddresses()).To(BeEmpty())

node := storeadapter.StoreNode{
Key: "/healthstatus/loggregator/z1/loggregator_z1",
Value: []byte("10.0.0.1"),
}
storeAdapter.Create(node)

expectedAddresses := []string{"10.0.0.1"}

Eventually(list.GetAddresses).Should(ConsistOf(expectedAddresses))
})

It("removes servers that disappear later", func() {
node := storeadapter.StoreNode{
Key: "/healthstatus/loggregator/z1/loggregator_z1",
Value: []byte("10.0.0.1"),
}

storeAdapter.Create(node)

list := servicediscovery.NewServerAddressList(storeAdapter, "/healthstatus/loggregator", loggertesthelper.Logger())

go list.Run(1 * time.Millisecond)

storeAdapter.Delete("/healthstatus/loggregator/z1/loggregator_z1")

Eventually(list.GetAddresses).Should(BeEmpty())
})

It("only finds nodes for the server type", func() {
node := storeadapter.StoreNode{
Key: "/healthstatus/loggregator/z1/loggregator_z1",
Value: []byte("10.0.0.1"),
}

storeAdapter.Create(node)

node = storeadapter.StoreNode{
Key: "/healthstatus/router/z1/router_z1",
Value: []byte("10.99.99.99"),
}

storeAdapter.Create(node)

go list.Run(1 * time.Millisecond)

expectedAddresses := []string{"10.0.0.1"}

Eventually(list.GetAddresses).Should(ConsistOf(expectedAddresses))
})

It("only returns one copy of each server", func() {
node := storeadapter.StoreNode{
Key: "/healthstatus/loggregator/z1/loggregator_z1",
Value: []byte("10.0.0.1"),
}

storeAdapter.Create(node)

go list.Run(1 * time.Millisecond)

expectedAddresses := []string{"10.0.0.1"}

Eventually(list.GetAddresses).Should(ConsistOf(expectedAddresses))
Consistently(list.GetAddresses).Should(HaveLen(1))
})

It("continues to run if the key is not found", func() {
node := storeadapter.StoreNode{
Key: "/healthstatus/loggregator/z1/loggregator_z1",
Value: []byte("10.0.0.1"),
}

storeAdapter.Create(node)

go list.Run(1 * time.Millisecond)

Eventually(list.GetAddresses).Should(HaveLen(1))

storeAdapter.Lock()
storeAdapter.ListErrInjector = fakestoreadapter.NewFakeStoreAdapterErrorInjector("", storeadapter.ErrorKeyNotFound)
storeAdapter.Unlock()

Consistently(list.GetAddresses).Should(HaveLen(1))
})

It("excludes nodes with no value", func() {
node := storeadapter.StoreNode{
Key: "/healthstatus/loggregator/z1/loggregator_z1",
Value: []byte{},
}

storeAdapter.Create(node)

go list.Run(1 * time.Millisecond)

Consistently(list.GetAddresses).Should(BeEmpty())
})
})

0 comments on commit 24462c4

Please sign in to comment.