Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
add new implementation of collector registrar
Browse files Browse the repository at this point in the history
- send announce message on a periodic schedule
- do not subscribe to discover message

[#79030122]

Signed-off-by: Johannes Petzold <jpetzold@pivotal.io>
  • Loading branch information
Andrew Poydence committed Sep 24, 2014
1 parent 888b219 commit 5de5c4e
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 0 deletions.
29 changes: 29 additions & 0 deletions cfcomponent/registrars/collectorregistrar/collector_messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package collectorregistrar

import (
"fmt"
"github.com/cloudfoundry/loggregatorlib/cfcomponent"
)

const AnnounceComponentMessageSubject = "vcap.component.announce"
const DiscoverComponentMessageSubject = "vcap.component.discover"

type AnnounceComponentMessage struct {
Type string `json:"type"`
Index uint `json:"index"`
Host string `json:"host"`
UUID string `json:"uuid"`
Credentials []string `json:"credentials"`
}

func NewAnnounceComponentMessage(cfc cfcomponent.Component) (message *AnnounceComponentMessage) {
message = &AnnounceComponentMessage{
Type: cfc.Type,
Index: cfc.Index,
Host: fmt.Sprintf("%s:%d", cfc.IpAddress, cfc.StatusPort),
UUID: fmt.Sprintf("%d-%s", cfc.Index, cfc.UUID),
Credentials: cfc.StatusCredentials,
}

return message
}
80 changes: 80 additions & 0 deletions cfcomponent/registrars/collectorregistrar/collector_registrar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package collectorregistrar

import (
"encoding/json"
"fmt"
"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/loggregatorlib/cfcomponent"
"github.com/cloudfoundry/yagnats"
"time"
)

type ClientProvider func(*gosteno.Logger, *cfcomponent.Config) (yagnats.ApceraWrapperNATSClient, error)

type CollectorRegistrar interface {
Run(stopChan <-chan struct{})
}

type collectorRegistrar struct {
clientProvider ClientProvider
interval time.Duration
logger *gosteno.Logger
cfc cfcomponent.Component
client yagnats.ApceraWrapperNATSClient
config *cfcomponent.Config
}

func NewCollectorRegistrar(clientProvider ClientProvider, cfc cfcomponent.Component, interval time.Duration, config *cfcomponent.Config) CollectorRegistrar {
return &collectorRegistrar{
clientProvider: clientProvider,
logger: cfc.Logger,
cfc: cfc,
interval: interval,
config: config,
}
}

func (registrar *collectorRegistrar) Run(stopChan <-chan struct{}) {
ticker := time.NewTicker(registrar.interval)
defer ticker.Stop()

for {
select {
case <-stopChan:
return
case <-ticker.C:
err := registrar.announceMessage()
if err != nil {
if registrar.client != nil {
registrar.client.Disconnect()
registrar.client = nil
}
registrar.logger.Warn(err.Error())
}
}
}
}

func (registrar *collectorRegistrar) announceMessage() error {
if registrar.client == nil {
registrar.logger.Debugf("creating NATS client")

var err error
registrar.client, err = registrar.clientProvider(registrar.logger, registrar.config)
if err != nil {
return fmt.Errorf("Failed to create client: %s", err)
}
}

json, err := json.Marshal(NewAnnounceComponentMessage(registrar.cfc))
if err != nil {
return fmt.Errorf("Failed to marshal component message: %s", err)
}

err = registrar.client.Publish(AnnounceComponentMessageSubject, json)
if err != nil {
return fmt.Errorf("Failed to publish: %s", err)
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package collectorregistrar_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"testing"
)

func TestCollectorregistrar(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Collectorregistrar Suite")
}
121 changes: 121 additions & 0 deletions cfcomponent/registrars/collectorregistrar/collectorregistrar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package collectorregistrar_test

import (
"github.com/cloudfoundry/loggregatorlib/cfcomponent/registrars/collectorregistrar"

"errors"
"github.com/apcera/nats"
"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/loggregatorlib/cfcomponent"
"github.com/cloudfoundry/loggregatorlib/loggertesthelper"
"github.com/cloudfoundry/yagnats"
"github.com/cloudfoundry/yagnats/fakeyagnats"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"sync/atomic"
"time"
)

var _ = Describe("Collectorregistrar", func() {
Describe("Run", func() {
var (
fakeClient *fakeClient
component cfcomponent.Component
registrar collectorregistrar.CollectorRegistrar
stopChan chan struct{}
doneChan chan struct{}
errorProvider func() error
fakeClientProviderCallCount int32
)

BeforeEach(func() {
fakeClient = newFakeClient()
component, _ = cfcomponent.NewComponent(loggertesthelper.Logger(), "compType", 3, nil, 9999, []string{"username", "password"}, nil)
component.UUID = "OurUUID"
errorProvider = func() error {
return nil
}
fakeClientProviderCallCount = 0
fakeClientProvider := func(*gosteno.Logger, *cfcomponent.Config) (yagnats.ApceraWrapperNATSClient, error) {
atomic.AddInt32(&fakeClientProviderCallCount, 1)
return fakeClient, errorProvider()
}
registrar = collectorregistrar.NewCollectorRegistrar(fakeClientProvider, component, 10*time.Millisecond, nil)
stopChan = make(chan struct{})
doneChan = make(chan struct{})

go func() {
defer close(doneChan)
registrar.Run(stopChan)
}()
})

AfterEach(func() {
close(stopChan)
Eventually(doneChan).Should(BeClosed())
})

Context("with no errors", func() {
It("periodically announces itself via NATS", func() {
var messages []*nats.Msg
Eventually(func() int {
messages = fakeClient.PublishedMessages(collectorregistrar.AnnounceComponentMessageSubject)
return len(messages)
}).Should(BeNumerically(">", 1))

for _, message := range messages {
Expect(message.Data).To(MatchRegexp(`^\{"type":"compType","index":3,"host":"[^:]*:9999","uuid":"3-OurUUID","credentials":\["username","password"\]\}$`))
}
})

It("reuses the client connection", func() {
Eventually(func() int32 { return atomic.LoadInt32(&fakeClientProviderCallCount) }).Should(BeEquivalentTo(1))
Consistently(func() int32 { return atomic.LoadInt32(&fakeClientProviderCallCount) }).Should(BeEquivalentTo(1))
})
})

Context("with errors", func() {
Context("from the client provider", func() {
BeforeEach(func() {
fakeError := errors.New("fake error")
errorProvider = func() error {
returnedError := fakeError
fakeError = nil
return returnedError
}
})

It("recovers when the client provider recovers", func() {
Eventually(fakeClient.PublishedMessageCount).Should(BeNumerically(">", 0))
})

It("disconnects the client", func() {
Eventually(fakeClient.Disconnected).Should(BeTrue())
})
})
})
})
})

type fakeClient struct {
*fakeyagnats.FakeApceraWrapper
disconnected bool
}

func newFakeClient() *fakeClient {
return &fakeClient{
FakeApceraWrapper: fakeyagnats.NewApceraClientWrapper(),
}
}

func (f *fakeClient) Disconnect() {
f.Lock()
defer f.Unlock()
f.disconnected = true
}

func (f *fakeClient) Disconnected() bool {
f.Lock()
defer f.Unlock()
return f.disconnected
}

0 comments on commit 5de5c4e

Please sign in to comment.