Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
simplify loggregator client pool
Browse files Browse the repository at this point in the history
- remove all asynchronicity
- address getter interface for easier testing

[#78156916]

Signed-off-by: Johannes Petzold <jpetzold@pivotal.io>
  • Loading branch information
Andrew Poydence authored and Johannes Petzold committed Sep 23, 2014
1 parent 74ac33c commit 3bf2372
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 185 deletions.
93 changes: 24 additions & 69 deletions clientpool/loggregator_client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,45 @@ import (
"fmt"
"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/loggregatorlib/loggregatorclient"
"github.com/cloudfoundry/storeadapter"
"math/rand"
"sync"
"time"
"github.com/cloudfoundry/loggregatorlib/servicediscovery"
)

var ErrorEmptyClientPool = errors.New("loggregator client pool is empty")

type AddressGetter interface {
GetAddresses() []string
}

type LoggregatorClientPool struct {
clients map[string]*loggregatorclient.LoggregatorClient
logger *gosteno.Logger
loggregatorPort int
sync.RWMutex
serverAddressList servicediscovery.ServerAddressList
serverAddressGetter AddressGetter
}

func NewLoggregatorClientPool(logger *gosteno.Logger, port int) *LoggregatorClientPool {
func NewLoggregatorClientPool(logger *gosteno.Logger, port int, getter AddressGetter) *LoggregatorClientPool {
return &LoggregatorClientPool{
loggregatorPort: port,
clients: make(map[string]*loggregatorclient.LoggregatorClient),
logger: logger,
loggregatorPort: port,
clients: make(map[string]*loggregatorclient.LoggregatorClient),
logger: logger,
serverAddressGetter: getter,
}
}

func (pool *LoggregatorClientPool) ListClients() []loggregatorclient.LoggregatorClient {
pool.syncWithAddressList(pool.serverAddressGetter.GetAddresses())

pool.RLock()
defer pool.RUnlock()

val := make([]loggregatorclient.LoggregatorClient, 0, len(pool.clients))
for _, client := range pool.clients {
val = append(val, *client)
}

return val
}

func (pool *LoggregatorClientPool) RandomClient() (loggregatorclient.LoggregatorClient, error) {
Expand All @@ -39,24 +55,6 @@ func (pool *LoggregatorClientPool) RandomClient() (loggregatorclient.Loggregator
return list[rand.Intn(len(list))], nil
}

func (pool *LoggregatorClientPool) RunUpdateLoop(storeAdapter storeadapter.StoreAdapter, key string, stopChan <-chan struct{}, interval time.Duration) {
pool.serverAddressList = servicediscovery.NewServerAddressList(storeAdapter, key, pool.logger)
go pool.serverAddressList.Run(interval)

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
pool.syncWithAddressList(pool.serverAddressList.GetAddresses())
case <-stopChan:
pool.serverAddressList.Stop()
return
}
}
}

func (pool *LoggregatorClientPool) syncWithAddressList(addresses []string) {
pool.Lock()
defer pool.Unlock()
Expand All @@ -76,50 +74,7 @@ func (pool *LoggregatorClientPool) syncWithAddressList(addresses []string) {
pool.clients = newClients
}

func (pool *LoggregatorClientPool) ListClients() []loggregatorclient.LoggregatorClient {
pool.RLock()
defer pool.RUnlock()

val := make([]loggregatorclient.LoggregatorClient, 0, len(pool.clients))
for _, client := range pool.clients {
val = append(val, *client)
}

return val
}

func (pool *LoggregatorClientPool) ListAddresses() []string {
pool.RLock()
defer pool.RUnlock()

val := make([]string, 0, len(pool.clients))
for addr := range pool.clients {
val = append(val, addr)
}

return val
}

func (pool *LoggregatorClientPool) Add(address string, client loggregatorclient.LoggregatorClient) {
pool.Lock()
defer pool.Unlock()

pool.clients[address] = &client
}

func (pool *LoggregatorClientPool) hasServerFor(addr string) bool {
_, ok := pool.clients[addr]
return ok
}

func leafNodes(root storeadapter.StoreNode) []storeadapter.StoreNode {
if !root.Dir {
return []storeadapter.StoreNode{root}
}

leaves := []storeadapter.StoreNode{}
for _, node := range root.ChildNodes {
leaves = append(leaves, leafNodes(node)...)
}
return leaves
}
152 changes: 36 additions & 116 deletions clientpool/loggregator_client_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
steno "github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/loggregatorlib/loggregatorclient"
"github.com/cloudfoundry/storeadapter"
"github.com/cloudfoundry/storeadapter/fakestoreadapter"
"math/rand"
"time"

Expand All @@ -21,27 +19,45 @@ var _ = BeforeSuite(func() {

var _ = Describe("LoggregatorClientPool", func() {
var (
adapter *fakestoreadapter.FakeStoreAdapter
pool *clientpool.LoggregatorClientPool
stopChan chan struct{}
logger *steno.Logger
pool *clientpool.LoggregatorClientPool
logger *steno.Logger
fakeGetter *fakeAddressGetter
)

BeforeEach(func() {
adapter = fakestoreadapter.New()

stopChan = make(chan struct{})
logger = steno.NewLogger("TestLogger")
pool = clientpool.NewLoggregatorClientPool(logger, 3456)
fakeGetter = &fakeAddressGetter{}
pool = clientpool.NewLoggregatorClientPool(logger, 3456, fakeGetter)
})

Describe("ListClients", func() {
Context("with empty address list", func() {
It("returns an empty client list", func() {
fakeGetter.addresses = []string{}
Expect(pool.ListClients()).To(HaveLen(0))
})
})

Context("with a non-empty address list", func() {
It("returns a client for every address", func() {
fakeGetter.addresses = []string{"127.0.0.1", "127.0.0.2"}
Expect(pool.ListClients()).To(HaveLen(2))
})
})

It("re-uses existing clients", func() {
fakeGetter.addresses = []string{"127.0.0.1"}
client1 := pool.ListClients()[0]
client2 := pool.ListClients()[0]
Expect(client1).To(Equal(client2))
})
})

Describe("RandomClient", func() {
Context("with a non-empty client pool", func() {
It("chooses a client with roughly uniform distribution", func() {
for i := 0; i < 5; i++ {
addr := fmt.Sprintf("127.0.0.1:%d", i)
client := loggregatorclient.NewLoggregatorClient(addr, logger, loggregatorclient.DefaultBufferSize)
pool.Add(addr, client)
fakeGetter.addresses = append(fakeGetter.addresses, fmt.Sprintf("127.0.0.%d", i))
}

counts := make(map[loggregatorclient.LoggregatorClient]int)
Expand All @@ -64,108 +80,12 @@ var _ = Describe("LoggregatorClientPool", func() {
})
})

Describe("RunUpdateLoop", func() {
It("shuts down when stopChan is closed", func() {
doneChan := make(chan struct{})

go func() {
pool.RunUpdateLoop(adapter, "z1", stopChan, 10*time.Millisecond)
close(doneChan)
}()

close(stopChan)

Eventually(doneChan).Should(BeClosed())
})

Context("when store adds a server", func() {
var addServer = func() {
doneChan := make(chan struct{})

go func() {
pool.RunUpdateLoop(adapter, "z1", stopChan, 10*time.Millisecond)
close(doneChan)
}()

adapter.Create(storeadapter.StoreNode{
Key: "z1/loggregator_trafficcontroller_z1/0",
Value: []byte("127.0.0.1"),
})
}

It("a non-nil client eventually appears in the pool", func() {
defer close(stopChan)
pool = clientpool.NewLoggregatorClientPool(logger, 3456)

addServer()

Eventually(pool.ListClients).Should(HaveLen(1))
Expect(pool.ListClients()[0]).ToNot(BeNil())
})

It("adds more servers later", func() {
defer close(stopChan)

doneChan := make(chan struct{})

go func() {
pool.RunUpdateLoop(adapter, "z1", stopChan, 10*time.Millisecond)
close(doneChan)
}()

adapter.Create(storeadapter.StoreNode{
Key: "z1/loggregator_trafficcontroller_z1/0",
Value: []byte("127.0.0.1"),
})

Eventually(pool.ListClients).Should(HaveLen(1))

adapter.Create(storeadapter.StoreNode{
Key: "z1/loggregator_trafficcontroller_z1/1",
Value: []byte("127.0.0.2"),
})

Eventually(pool.ListClients).Should(HaveLen(2))
Eventually(pool.ListAddresses).Should(ConsistOf("127.0.0.1:3456", "127.0.0.2:3456"))
})

It("does not duplicate known servers", func() {
pool.Add("127.0.0.1", loggregatorclient.NewLoggregatorClient("127.0.0.1:1", logger, loggregatorclient.DefaultBufferSize))

defer close(stopChan)

doneChan := make(chan struct{})

go func() {
pool.RunUpdateLoop(adapter, "z1", stopChan, 10*time.Millisecond)
close(doneChan)
}()

adapter.Create(storeadapter.StoreNode{
Key: "z1/loggregator_trafficcontroller_z1/0",
Value: []byte("127.0.0.1"),
})

Eventually(pool.ListClients).Should(HaveLen(1))
Consistently(pool.ListClients).Should(HaveLen(1))
})
})

Context("when store removes a server", func() {
It("eventually disappears from the pool", func() {
pool.Add("127.0.0.1", loggregatorclient.NewLoggregatorClient("127.0.0.1:3456", logger, loggregatorclient.DefaultBufferSize))

defer close(stopChan)

doneChan := make(chan struct{})
})

go func() {
pool.RunUpdateLoop(adapter, "z1", stopChan, 10*time.Millisecond)
close(doneChan)
}()
type fakeAddressGetter struct {
addresses []string
}

Eventually(pool.ListClients).Should(HaveLen(0))
})
})
})
})
func (getter *fakeAddressGetter) GetAddresses() []string {
return getter.addresses
}

0 comments on commit 3bf2372

Please sign in to comment.